You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/28 14:01:54 UTC

[dolphinscheduler] branch 3.1.2-prepare updated (a87206228a -> 1bfd8f5327)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a change to branch 3.1.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


    from a87206228a [Bug-13010] [Task] The Flink SQL task page selects the pre-job deployment mode, but the task executed by the worker is the Flink local mode
     new 682829e4ea [improve][python] Validate version of Python API at launch (#11626)
     new 696d8ae7f1 [feat] Support set execute type to pydolphinscheduler (#12871)
     new 416c41465d [fix] Add token as authentication for python gateway (#12893)
     new 6ed1605680 [chore][python] Change name from process definition to workflow (#12918)
     new 1bfd8f5327 [Feature] Add CURD to the project/tenant/user section of the python-DS (#11162)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../configuration/PythonGatewayConfiguration.java  |  68 +-----
 .../dolphinscheduler/api/python/PythonGateway.java | 270 ++++++++++++---------
 .../api/service/impl/UsersServiceImpl.java         |   1 +
 .../src/main/resources/application.yaml            |   3 +
 .../api/service/UsersServiceTest.java              |   4 +-
 .../src/pydolphinscheduler/java_gateway.py         |  87 ++++++-
 .../src/pydolphinscheduler/models/base_side.py     |   8 +
 .../src/pydolphinscheduler/models/project.py       |  31 +++
 .../src/pydolphinscheduler/models/tenant.py        |  38 ++-
 .../src/pydolphinscheduler/models/user.py          |  55 ++++-
 .../tests/integration/conftest.py                  |   2 +-
 .../tests/integration/test_project.py              |  78 ++++++
 .../tests/integration/test_tenant.py               |  86 +++++++
 .../tests/integration/test_user.py                 | 107 ++++++++
 .../src/main/resources/application.yaml            |   3 +
 15 files changed, 654 insertions(+), 187 deletions(-)
 create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/integration/test_project.py
 create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/integration/test_tenant.py
 create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/integration/test_user.py


[dolphinscheduler] 02/05: [feat] Support set execute type to pydolphinscheduler (#12871)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.1.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 696d8ae7f1357d3c10d781ade4c67d5a53cb2e27
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Sat Nov 12 10:31:08 2022 +0800

    [feat] Support set execute type to pydolphinscheduler (#12871)
    
    Up to now, we can only submit workflow with parallel mode. this patch give users ability specific execute type to workflow
    
    (cherry picked from commit 87a88e36627017607c73cfc66a92be08d1da22ee)
---
 .../dolphinscheduler/api/python/PythonGateway.java   | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 00a817a569..d9f0c78674 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -234,28 +234,34 @@ public class PythonGateway {
                                                 String taskRelationJson,
                                                 String taskDefinitionJson,
                                                 String otherParamsJson,
-                                                ProcessExecutionTypeEnum executionType) {
+                                                String executionType) {
         User user = usersService.queryUser(userName);
         Project project = projectMapper.queryByName(projectName);
         long projectCode = project.getCode();
 
         ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
+        ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType);
         long processDefinitionCode;
         // create or update process definition
         if (processDefinition != null) {
             processDefinitionCode = processDefinition.getCode();
             // make sure process definition offline which could edit
-            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
-            Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams,
-                    null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType);
+            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
+                    ReleaseState.OFFLINE);
+            processDefinitionService.updateProcessDefinition(user, projectCode, name,
+                    processDefinitionCode, description, globalParams,
+                    null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson,
+                    executionTypeEnum);
         } else {
-            Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
-                null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType);
+            Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name,
+                    description, globalParams,
+                    null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson,
+                    executionTypeEnum);
             processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST);
             processDefinitionCode = processDefinition.getCode();
         }
 
-        // Fresh process definition schedule 
+        // Fresh process definition schedule
         if (schedule != null) {
             createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
         }


[dolphinscheduler] 01/05: [improve][python] Validate version of Python API at launch (#11626)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.1.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 682829e4ea799d3965778392149206ed51d037cb
Author: Chris <re...@gmail.com>
AuthorDate: Tue Sep 27 21:28:20 2022 +0800

    [improve][python] Validate version of Python API at launch (#11626)
    
    (cherry picked from commit 7ed52c3ecb6b4165256b244cd595e502007b2923)
---
 .../dolphinscheduler/api/python/PythonGateway.java |  4 ++++
 .../src/pydolphinscheduler/java_gateway.py         | 23 +++++++++++++++++++++-
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 2732bd629e..00a817a569 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -599,6 +599,10 @@ public class PythonGateway {
         return resourceService.queryResourcesFileInfo(userName, fullName);
     }
 
+    public String getGatewayVersion() {
+        return PythonGateway.class.getPackage().getImplementationVersion();
+    }
+
     /**
      * create or update resource.
      * If the folder is not already created, it will be
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
index 0ff74ba655..4be63070a9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -17,15 +17,20 @@
 
 """Module java gateway, contain gateway behavior."""
 
+import contextlib
+from logging import getLogger
 from typing import Any, Optional
 
 from py4j.java_collections import JavaMap
 from py4j.java_gateway import GatewayParameters, JavaGateway
+from py4j.protocol import Py4JError
 
-from pydolphinscheduler import configuration
+from pydolphinscheduler import __version__, configuration
 from pydolphinscheduler.constants import JavaGatewayDefault
 from pydolphinscheduler.exceptions import PyDSJavaGatewayException
 
+logger = getLogger(__name__)
+
 
 def launch_gateway(
     address: Optional[str] = None,
@@ -75,6 +80,22 @@ class JavaGate:
         auto_convert: Optional[bool] = True,
     ):
         self.java_gateway = launch_gateway(address, port, auto_convert)
+        gateway_version = "unknown"
+        with contextlib.suppress(Py4JError):
+            # 1. Java gateway version is too old: doesn't have method 'getGatewayVersion()'
+            # 2. Error connecting to Java gateway
+            gateway_version = self.get_gateway_version()
+        if gateway_version != __version__:
+            logger.warning(
+                f"Using unmatched version of pydolphinscheduler (version {__version__}) "
+                f"and Java gateway (version {gateway_version}) may cause errors. "
+                "We strongly recommend you to find the matched version "
+                "(check: https://pypi.org/project/apache-dolphinscheduler)"
+            )
+
+    def get_gateway_version(self):
+        """Get the java gateway version, expected to be equal with pydolphinscheduler."""
+        return self.java_gateway.entry_point.getGatewayVersion()
 
     def get_datasource_info(self, name: str):
         """Get datasource info through java gateway."""


[dolphinscheduler] 05/05: [Feature] Add CURD to the project/tenant/user section of the python-DS (#11162)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.1.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 1bfd8f53274cf4563d526e6d078c8957cab1c1a9
Author: Lyle Shaw <ly...@hdu.edu.cn>
AuthorDate: Thu Sep 22 11:03:27 2022 +0800

    [Feature] Add CURD to the project/tenant/user section of the python-DS (#11162)
    
    - Add CURD in project
    - Add CURD in tenant
    - Add CURD in user
    - Add test in user
    
    Co-authored-by: Jiajie Zhong <zh...@gmail.com>
    
    (cherry picked from commit cc492c3e13fa8de96706d4360ee6f8d628639200)
---
 .../dolphinscheduler/api/python/PythonGateway.java |  24 +++--
 .../api/service/impl/UsersServiceImpl.java         |   1 +
 .../api/service/UsersServiceTest.java              |   4 +-
 .../src/pydolphinscheduler/java_gateway.py         |  64 ++++++++++++
 .../src/pydolphinscheduler/models/base_side.py     |   8 ++
 .../src/pydolphinscheduler/models/project.py       |  31 ++++++
 .../src/pydolphinscheduler/models/tenant.py        |  38 +++++++-
 .../src/pydolphinscheduler/models/user.py          |  55 ++++++++++-
 .../tests/integration/conftest.py                  |   2 +-
 .../tests/integration/test_project.py              |  78 +++++++++++++++
 .../tests/integration/test_tenant.py               |  86 +++++++++++++++++
 .../tests/integration/test_user.py                 | 107 +++++++++++++++++++++
 12 files changed, 484 insertions(+), 14 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 7fb225268c..a7ffbbaec8 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -417,7 +417,7 @@ public class PythonGateway {
 
     public Project queryProjectByName(String userName, String projectName) {
         User user = usersService.queryUser(userName);
-        return (Project) projectService.queryByName(user, projectName);
+        return (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
     }
 
     public void updateProject(String userName, Long projectCode, String projectName, String desc) {
@@ -434,9 +434,8 @@ public class PythonGateway {
         return tenantService.createTenantIfNotExists(tenantCode, desc, queueName, queueName);
     }
 
-    public Result queryTenantList(String userName, String searchVal, Integer pageNo, Integer pageSize) {
-        User user = usersService.queryUser(userName);
-        return tenantService.queryTenantList(user, searchVal, pageNo, pageSize);
+    public Tenant queryTenantByCode(String tenantCode) {
+        return (Tenant) tenantService.queryByTenantCode(tenantCode).get(Constants.DATA_LIST);
     }
 
     public void updateTenant(String userName, int id, String tenantCode, int queueId, String desc) throws Exception {
@@ -449,27 +448,32 @@ public class PythonGateway {
         tenantService.deleteTenantById(user, tenantId);
     }
 
-    public void createUser(String userName,
+    public User createUser(String userName,
                            String userPassword,
                            String email,
                            String phone,
                            String tenantCode,
                            String queue,
                            int state) throws IOException {
-        usersService.createUserIfNotExists(userName, userPassword, email, phone, tenantCode, queue, state);
+        return usersService.createUserIfNotExists(userName, userPassword, email, phone, tenantCode, queue, state);
     }
 
     public User queryUser(int id) {
-        return usersService.queryUser(id);
+        User user = usersService.queryUser(id);
+        if (user == null) {
+            throw new RuntimeException("User not found");
+        }
+        return user;
     }
 
-    public void updateUser(String userName, String userPassword, String email, String phone, String tenantCode, String queue, int state) throws Exception {
-        usersService.createUserIfNotExists(userName, userPassword, email, phone, tenantCode, queue, state);
+    public User updateUser(String userName, String userPassword, String email, String phone, String tenantCode, String queue, int state) throws Exception {
+        return usersService.createUserIfNotExists(userName, userPassword, email, phone, tenantCode, queue, state);
     }
 
-    public void deleteUser(String userName, int id) throws Exception {
+    public User deleteUser(String userName, int id) throws Exception {
         User user = usersService.queryUser(userName);
         usersService.deleteUserById(user, id);
+        return usersService.queryUser(userName);
     }
 
     /**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
index 54acc97691..51d7196901 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
@@ -1312,6 +1312,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
         }
 
         updateUser(user, user.getId(), userName, userPassword, email, user.getTenantId(), phone, queue, state, null);
+        user = userMapper.queryDetailsById(user.getId());
         return user;
     }
 }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
index b63c021b7d..9ad170d7eb 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
@@ -735,12 +735,14 @@ public class UsersServiceTest {
         String userName = "userTest0001";
         String userPassword = "userTest";
         String email = "abc@x.com";
-        String phone = "123456789";
+        String phone = "17366666666";
         String tenantCode = "tenantCode";
         int stat = 1;
 
         // User exists
         Mockito.when(userMapper.existUser(userName)).thenReturn(true);
+        Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser());
+        Mockito.when(userMapper.queryDetailsById(getUser().getId())).thenReturn(getUser());
         Mockito.when(userMapper.queryByUserNameAccurately(userName)).thenReturn(getUser());
         Mockito.when(tenantMapper.queryByTenantCode(tenantCode)).thenReturn(getTenant());
         user = usersService.createUserIfNotExists(userName, userPassword, email, phone, tenantCode, queueName, stat);
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
index 4be63070a9..54bb0a38b2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -139,6 +139,22 @@ class JavaGate:
             user, name, description
         )
 
+    def query_project_by_name(self, user: str, name: str):
+        """Query project through java gateway."""
+        return self.java_gateway.entry_point.queryProjectByName(user, name)
+
+    def update_project(
+        self, user: str, project_code: int, project_name: str, description: str
+    ):
+        """Update project through java gateway."""
+        return self.java_gateway.entry_point.updateProject(
+            user, project_code, project_name, description
+        )
+
+    def delete_project(self, user: str, code: int):
+        """Delete project through java gateway."""
+        return self.java_gateway.entry_point.deleteProject(user, code)
+
     def create_tenant(
         self, tenant_name: str, queue_name: str, description: Optional[str] = None
     ):
@@ -147,6 +163,31 @@ class JavaGate:
             tenant_name, description, queue_name
         )
 
+    def query_tenant(self, tenant_code: str):
+        """Query tenant through java gateway."""
+        return self.java_gateway.entry_point.queryTenantByCode(tenant_code)
+
+    def grant_tenant_to_user(self, user_name: str, tenant_code: str):
+        """Grant tenant to user through java gateway."""
+        return self.java_gateway.entry_point.grantTenantToUser(user_name, tenant_code)
+
+    def update_tenant(
+        self,
+        user: str,
+        tenant_id: int,
+        code: str,
+        queue_id: int,
+        description: Optional[str] = None,
+    ):
+        """Update tenant through java gateway."""
+        return self.java_gateway.entry_point.updateTenant(
+            user, tenant_id, code, queue_id, description
+        )
+
+    def delete_tenant(self, user: str, tenant_id: int):
+        """Delete tenant through java gateway."""
+        return self.java_gateway.entry_point.deleteTenantById(user, tenant_id)
+
     def create_user(
         self,
         name: str,
@@ -162,6 +203,29 @@ class JavaGate:
             name, password, email, phone, tenant, queue, status
         )
 
+    def query_user(self, user_id: int):
+        """Query user through java gateway."""
+        return self.java_gateway.queryUser(user_id)
+
+    def update_user(
+        self,
+        name: str,
+        password: str,
+        email: str,
+        phone: str,
+        tenant: str,
+        queue: str,
+        status: int,
+    ):
+        """Update user through java gateway."""
+        return self.java_gateway.entry_point.updateUser(
+            name, password, email, phone, tenant, queue, status
+        )
+
+    def delete_user(self, name: str, user_id: int):
+        """Delete user through java gateway."""
+        return self.java_gateway.entry_point.deleteUser(name, user_id)
+
     def get_dependent_info(
         self,
         project_name: str,
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/base_side.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/base_side.py
index 67ac88d96b..99b4007a85 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/base_side.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/base_side.py
@@ -38,3 +38,11 @@ class BaseSide(Base):
     ):
         """Create Base if not exists."""
         raise NotImplementedError
+
+    def delete_all(self):
+        """Delete all method."""
+        if not self:
+            return
+        list_pro = [key for key in self.__dict__.keys()]
+        for key in list_pro:
+            self.__delattr__(key)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py
index bebdafd67e..678332ba3b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py
@@ -31,11 +31,42 @@ class Project(BaseSide):
         self,
         name: str = configuration.WORKFLOW_PROJECT,
         description: Optional[str] = None,
+        code: Optional[int] = None,
     ):
         super().__init__(name, description)
+        self.code = code
 
     def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
         """Create Project if not exists."""
         JavaGate().create_or_grant_project(user, self.name, self.description)
         # TODO recover result checker
         # gateway_result_checker(result, None)
+
+    @classmethod
+    def get_project_by_name(cls, user=configuration.USER_NAME, name=None) -> "Project":
+        """Get Project by name."""
+        project = JavaGate().query_project_by_name(user, name)
+        if project is None:
+            return cls()
+        return cls(
+            name=project.getName(),
+            description=project.getDescription(),
+            code=project.getCode(),
+        )
+
+    def update(
+        self,
+        user=configuration.USER_NAME,
+        project_code=None,
+        project_name=None,
+        description=None,
+    ) -> None:
+        """Update Project."""
+        JavaGate().update_project(user, project_code, project_name, description)
+        self.name = project_name
+        self.description = description
+
+    def delete(self, user=configuration.USER_NAME) -> None:
+        """Delete Project."""
+        JavaGate().delete_project(user, self.code)
+        self.delete_all()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py
index 6641d9aef7..09b00ccf3a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py
@@ -32,13 +32,49 @@ class Tenant(BaseSide):
         name: str = configuration.WORKFLOW_TENANT,
         queue: str = configuration.WORKFLOW_QUEUE,
         description: Optional[str] = None,
+        tenant_id: Optional[int] = None,
+        code: Optional[str] = None,
+        user_name: Optional[str] = None,
     ):
         super().__init__(name, description)
+        self.tenant_id = tenant_id
         self.queue = queue
+        self.code = code
+        self.user_name = user_name
 
     def create_if_not_exists(
         self, queue_name: str, user=configuration.USER_NAME
     ) -> None:
         """Create Tenant if not exists."""
-        JavaGate().create_tenant(self.name, queue_name, self.description)
+        tenant = JavaGate().create_tenant(self.name, self.description, queue_name)
+        self.tenant_id = tenant.getId()
+        self.code = tenant.getTenantCode()
         # gateway_result_checker(result, None)
+
+    @classmethod
+    def get_tenant(cls, code: str) -> "Tenant":
+        """Get Tenant list."""
+        tenant = JavaGate().query_tenant(code)
+        if tenant is None:
+            return cls()
+        return cls(
+            description=tenant.getDescription(),
+            tenant_id=tenant.getId(),
+            code=tenant.getTenantCode(),
+            queue=tenant.getQueueId(),
+        )
+
+    def update(
+        self, user=configuration.USER_NAME, code=None, queue_id=None, description=None
+    ) -> None:
+        """Update Tenant."""
+        JavaGate().update_tenant(user, self.tenant_id, code, queue_id, description)
+        # TODO: check queue_id and queue_name
+        self.queue = str(queue_id)
+        self.code = code
+        self.description = description
+
+    def delete(self) -> None:
+        """Delete Tenant."""
+        JavaGate().delete_tenant(self.user_name, self.tenant_id)
+        self.delete_all()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py
index e11bb9ca0d..57c6af647f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py
@@ -48,6 +48,7 @@ class User(BaseSide):
         status: Optional[int] = configuration.USER_STATE,
     ):
         super().__init__(name)
+        self.user_id: Optional[int] = None
         self.password = password
         self.email = email
         self.phone = phone
@@ -64,7 +65,7 @@ class User(BaseSide):
         """Create User if not exists."""
         # Should make sure queue already exists.
         self.create_tenant_if_not_exists()
-        JavaGate().create_user(
+        user = JavaGate().create_user(
             self.name,
             self.password,
             self.email,
@@ -73,5 +74,57 @@ class User(BaseSide):
             self.queue,
             self.status,
         )
+        self.user_id = user.getId()
         # TODO recover result checker
         # gateway_result_checker(result, None)
+
+    @classmethod
+    def get_user(cls, user_id) -> "User":
+        """Get User."""
+        user = JavaGate().query_user(user_id)
+        if user is None:
+            return cls("")
+        user_id = user.getId()
+        user = cls(
+            name=user.getUserName(),
+            password=user.getUserPassword(),
+            email=user.getEmail(),
+            phone=user.getPhone(),
+            tenant=user.getTenantCode(),
+            queue=user.getQueueName(),
+            status=user.getState(),
+        )
+        user.user_id = user_id
+        return user
+
+    def update(
+        self,
+        password=None,
+        email=None,
+        phone=None,
+        tenant=None,
+        queue=None,
+        status=None,
+    ) -> None:
+        """Update User."""
+        user = JavaGate().update_user(
+            self.name,
+            password,
+            email,
+            phone,
+            tenant,
+            queue,
+            status,
+        )
+        self.user_id = user.getId()
+        self.name = user.getUserName()
+        self.password = user.getUserPassword()
+        self.email = user.getEmail()
+        self.phone = user.getPhone()
+        self.queue = user.getQueueName()
+        self.status = user.getState()
+
+    def delete(self) -> None:
+        """Delete User."""
+        JavaGate().delete_user(self.name, self.user_id)
+        self.delete_all()
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
index 236956b6e3..c15b89768d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
@@ -42,7 +42,7 @@ def docker_setup_teardown():
             image="apache/dolphinscheduler-standalone-server:ci",
             container_name="ci-dolphinscheduler-standalone-server",
         )
-        ports = {"25333/tcp": 25333}
+        ports = {"25333/tcp": 25333, "12345/tcp": 12345}
         container = docker_wrapper.run_until_log(
             log="Started StandaloneServer in", tty=True, ports=ports
         )
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_project.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_project.py
new file mode 100644
index 0000000000..167ce2d8c9
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_project.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test pydolphinscheduler project."""
+import pytest
+
+from pydolphinscheduler.models import Project, User
+
+
+def get_user(
+    name="test-name",
+    password="test-password",
+    email="test-email@abc.com",
+    phone="17366637777",
+    tenant="test-tenant",
+    queue="test-queue",
+    status=1,
+):
+    """Get a test user."""
+    user = User(name, password, email, phone, tenant, queue, status)
+    user.create_if_not_exists()
+    return user
+
+
+def get_project(name="test-name-1", description="test-description", code=1):
+    """Get a test project."""
+    project = Project(name, description, code=code)
+    user = get_user()
+    project.create_if_not_exists(user=user.name)
+    return project
+
+
+def test_create_and_get_project():
+    """Test create and get project from java gateway."""
+    project = get_project()
+    project_ = Project.get_project_by_name(user="test-name", name=project.name)
+    assert project_.name == project.name
+    assert project_.description == project.description
+
+
+def test_update_project():
+    """Test update project from java gateway."""
+    project = get_project()
+    project = project.get_project_by_name(user="test-name", name=project.name)
+    project.update(
+        user="test-name",
+        project_code=project.code,
+        project_name="test-name-updated",
+        description="test-description-updated",
+    )
+    project_ = Project.get_project_by_name(user="test-name", name="test-name-updated")
+    assert project_.description == "test-description-updated"
+
+
+def test_delete_project():
+    """Test delete project from java gateway."""
+    project = get_project()
+    project.get_project_by_name(user="test-name", name=project.name)
+    project.delete(user="test-name")
+
+    with pytest.raises(AttributeError) as excinfo:
+        _ = project.name
+
+    assert excinfo.type == AttributeError
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_tenant.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_tenant.py
new file mode 100644
index 0000000000..c1ec33c335
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_tenant.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test pydolphinscheduler tenant."""
+import pytest
+
+from pydolphinscheduler.models import Tenant, User
+
+
+def get_user(
+    name="test-name",
+    password="test-password",
+    email="test-email@abc.com",
+    phone="17366637777",
+    tenant="test-tenant",
+    queue="test-queue",
+    status=1,
+):
+    """Get a test user."""
+    user = User(name, password, email, phone, tenant, queue, status)
+    user.create_if_not_exists()
+    return user
+
+
+def get_tenant(
+    name="test-name-1",
+    queue="test-queue-1",
+    description="test-description",
+    tenant_code="test-tenant-code",
+    user_name=None,
+):
+    """Get a test tenant."""
+    tenant = Tenant(name, queue, description, code=tenant_code, user_name=user_name)
+    tenant.create_if_not_exists(name)
+    return tenant
+
+
+def test_create_tenant():
+    """Test create tenant from java gateway."""
+    tenant = get_tenant()
+    assert tenant.tenant_id is not None
+
+
+def test_get_tenant():
+    """Test get tenant from java gateway."""
+    tenant = get_tenant()
+    tenant_ = Tenant.get_tenant(tenant.code)
+    assert tenant_.tenant_id == tenant.tenant_id
+
+
+def test_update_tenant():
+    """Test update tenant from java gateway."""
+    tenant = get_tenant(user_name="admin")
+    tenant.update(
+        user="admin",
+        code="test-code-updated",
+        queue_id=1,
+        description="test-description-updated",
+    )
+    tenant_ = Tenant.get_tenant(code=tenant.code)
+    assert tenant_.code == "test-code-updated"
+    assert tenant_.queue == 1
+
+
+def test_delete_tenant():
+    """Test delete tenant from java gateway."""
+    tenant = get_tenant(user_name="admin")
+    tenant.delete()
+    with pytest.raises(AttributeError) as excinfo:
+        _ = tenant.tenant_id
+
+    assert excinfo.type == AttributeError
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_user.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_user.py
new file mode 100644
index 0000000000..74248fa8c3
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_user.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test pydolphinscheduler user."""
+
+import hashlib
+
+import pytest
+
+from pydolphinscheduler.models import User
+
+
+def md5(str):
+    """MD5 a string."""
+    hl = hashlib.md5()
+    hl.update(str.encode(encoding="utf-8"))
+    return hl.hexdigest()
+
+
+def get_user(
+    name="test-name",
+    password="test-password",
+    email="test-email@abc.com",
+    phone="17366637777",
+    tenant="test-tenant",
+    queue="test-queue",
+    status=1,
+):
+    """Get a test user."""
+    user = User(
+        name=name,
+        password=password,
+        email=email,
+        phone=phone,
+        tenant=tenant,
+        queue=queue,
+        status=status,
+    )
+    user.create_if_not_exists()
+    return user
+
+
+def test_create_user():
+    """Test weather client could connect java gate way or not."""
+    user = User(
+        name="test-name",
+        password="test-password",
+        email="test-email@abc.com",
+        phone="17366637777",
+        tenant="test-tenant",
+        queue="test-queue",
+        status=1,
+    )
+    user.create_if_not_exists()
+    assert user.user_id is not None
+
+
+def test_get_user():
+    """Test get user from java gateway."""
+    user = get_user()
+    user_ = User.get_user(user.user_id)
+    assert user_.password == md5(user.password)
+    assert user_.email == user.email
+    assert user_.phone == user.phone
+    assert user_.status == user.status
+
+
+def test_update_user():
+    """Test update user from java gateway."""
+    user = get_user()
+    user.update(
+        password="test-password-",
+        email="test-email-updated@abc.com",
+        phone="17366637766",
+        tenant="test-tenant-updated",
+        queue="test-queue-updated",
+        status=2,
+    )
+    user_ = User.get_user(user.user_id)
+    assert user_.password == md5("test-password-")
+    assert user_.email == "test-email-updated@abc.com"
+    assert user_.phone == "17366637766"
+    assert user_.status == 2
+
+
+def test_delete_user():
+    """Test delete user from java gateway."""
+    user = get_user()
+    user.delete()
+    with pytest.raises(AttributeError) as excinfo:
+        _ = user.user_id
+
+    assert excinfo.type == AttributeError


[dolphinscheduler] 04/05: [chore][python] Change name from process definition to workflow (#12918)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.1.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 6ed16056808bca20ef562363de93193d4885df88
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Wed Nov 16 21:23:39 2022 +0800

    [chore][python] Change name from process definition to workflow (#12918)
    
    only change its name in python gateway server code, incluing
    
    * Function name: all related to process definition
    * Parameter name and comment related
    
    ref: apache/dolphinscheduler-sdk-python#22
    
    (cherry picked from commit f20c9b3102503a1306d5fa3504ddce56a76d58ab)
---
 .../dolphinscheduler/api/python/PythonGateway.java | 170 +++++++++++----------
 1 file changed, 90 insertions(+), 80 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index b79eaf307e..7fb225268c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -82,7 +82,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import py4j.GatewayServer;
 
 @Component
 public class PythonGateway {
@@ -183,8 +182,10 @@ public class PythonGateway {
             return result;
         }
 
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
-        // In the case project exists, but current process definition still not created, we should also return the init version of it
+        ProcessDefinition processDefinition =
+                processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+        // In the case project exists, but current workflow still not created, we should also return the init
+        // version of it
         if (processDefinition == null) {
             result.put("code", CodeGenerateUtils.getInstance().genCode());
             result.put("version", 0L);
@@ -203,20 +204,20 @@ public class PythonGateway {
     }
 
     /**
-     * create or update process definition.
-     * If process definition do not exists in Project=`projectCode` would create a new one
-     * If process definition already exists in Project=`projectCode` would update it
+     * create or update workflow.
+     * If workflow do not exists in Project=`projectCode` would create a new one
+     * If workflow already exists in Project=`projectCode` would update it
      *
-     * @param userName user name who create or update process definition
-     * @param projectName project name which process definition belongs to
-     * @param name process definition name
+     * @param userName user name who create or update workflow
+     * @param projectName project name which workflow belongs to
+     * @param name workflow name
      * @param description description
      * @param globalParams global params
-     * @param schedule schedule for process definition, will not set schedule if null,
+     * @param schedule schedule for workflow, will not set schedule if null,
      * and if would always fresh exists schedule if not null
      * @param warningType warning type
      * @param warningGroupId warning group id
-     * @param timeout timeout for process definition working, if running time longer than timeout,
+     * @param timeout timeout for workflow working, if running time longer than timeout,
      * task will mark as fail
      * @param workerGroup run task in which worker group
      * @param tenantCode tenantCode
@@ -225,33 +226,33 @@ public class PythonGateway {
      * @param otherParamsJson otherParamsJson handle other params
      * @return create result code
      */
-    public Long createOrUpdateProcessDefinition(String userName,
-                                                String projectName,
-                                                String name,
-                                                String description,
-                                                String globalParams,
-                                                String schedule,
-                                                String warningType,
-                                                int warningGroupId,
-                                                int timeout,
-                                                String workerGroup,
-                                                String tenantCode,
-                                                int releaseState,
-                                                String taskRelationJson,
-                                                String taskDefinitionJson,
-                                                String otherParamsJson,
-                                                String executionType) {
+    public Long createOrUpdateWorkflow(String userName,
+                                       String projectName,
+                                       String name,
+                                       String description,
+                                       String globalParams,
+                                       String schedule,
+                                       String warningType,
+                                       int warningGroupId,
+                                       int timeout,
+                                       String workerGroup,
+                                       String tenantCode,
+                                       int releaseState,
+                                       String taskRelationJson,
+                                       String taskDefinitionJson,
+                                       String otherParamsJson,
+                                       String executionType) {
         User user = usersService.queryUser(userName);
         Project project = projectMapper.queryByName(projectName);
         long projectCode = project.getCode();
 
-        ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
+        ProcessDefinition processDefinition = getWorkflow(user, projectCode, name);
         ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType);
         long processDefinitionCode;
-        // create or update process definition
+        // create or update workflow
         if (processDefinition != null) {
             processDefinitionCode = processDefinition.getCode();
-            // make sure process definition offline which could edit
+            // make sure workflow offline which could edit
             processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
                     ReleaseState.OFFLINE);
             processDefinitionService.updateProcessDefinition(user, projectCode, name,
@@ -267,7 +268,7 @@ public class PythonGateway {
             processDefinitionCode = processDefinition.getCode();
         }
 
-        // Fresh process definition schedule
+        // Fresh workflow schedule
         if (schedule != null) {
             createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
         }
@@ -276,21 +277,23 @@ public class PythonGateway {
     }
 
     /**
-     * get process definition
+     * get workflow
      *
      * @param user user who create or update schedule
-     * @param projectCode project which process definition belongs to
-     * @param processDefinitionName process definition name
+     * @param projectCode project which workflow belongs to
+     * @param workflowName workflow name
      */
-    private ProcessDefinition getProcessDefinition(User user, long projectCode, String processDefinitionName) {
-        Map<String, Object> verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, processDefinitionName, 0);
+    private ProcessDefinition getWorkflow(User user, long projectCode, String workflowName) {
+        Map<String, Object> verifyProcessDefinitionExists =
+                processDefinitionService.verifyProcessDefinitionName(user, projectCode, workflowName, 0);
         Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS);
 
         ProcessDefinition processDefinition = null;
         if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) {
-            processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
+            processDefinition = processDefinitionMapper.queryByDefineName(projectCode, workflowName);
         } else if (verifyStatus != Status.SUCCESS) {
-            String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
+            String msg =
+                    "Verify workflow exists status is invalid, neither SUCCESS or WORKFLOW_NAME_EXIST.";
             logger.error(msg);
             throw new RuntimeException(msg);
         }
@@ -299,13 +302,13 @@ public class PythonGateway {
     }
 
     /**
-     * create or update process definition schedule.
+     * create or update workflow schedule.
      * It would always use latest schedule define in workflow-as-code, and set schedule online when
      * it's not null
      *
      * @param user user who create or update schedule
-     * @param projectCode project which process definition belongs to
-     * @param processDefinitionCode process definition code
+     * @param projectCode project which workflow belongs to
+     * @param workflowCode workflow code
      * @param schedule schedule expression
      * @param workerGroup work group
      * @param warningType warning type
@@ -313,43 +316,47 @@ public class PythonGateway {
      */
     private void createOrUpdateSchedule(User user,
                                         long projectCode,
-                                        long processDefinitionCode,
+                                        long workflowCode,
                                         String schedule,
                                         String workerGroup,
                                         String warningType,
                                         int warningGroupId) {
-        Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
+        Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(workflowCode);
         // create or update schedule
         int scheduleId;
         if (scheduleObj == null) {
-            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
-            Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, WarningType.valueOf(warningType),
+            processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
+                    ReleaseState.ONLINE);
+            Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, workflowCode,
+                    schedule, WarningType.valueOf(warningType),
                     warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
             scheduleId = (int) result.get("scheduleId");
         } else {
             scheduleId = scheduleObj.getId();
-            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
+            processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
+                    ReleaseState.OFFLINE);
             schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
                     warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
         }
         schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
     }
 
-    public void execProcessInstance(String userName,
-                                    String projectName,
-                                    String processDefinitionName,
-                                    String cronTime,
-                                    String workerGroup,
-                                    String warningType,
-                                    int warningGroupId,
-                                    Integer timeout
-    ) {
+    public void execWorkflowInstance(String userName,
+                                     String projectName,
+                                     String workflowName,
+                                     String cronTime,
+                                     String workerGroup,
+                                     String warningType,
+                                     Integer warningGroupId,
+                                     Integer timeout) {
         User user = usersService.queryUser(userName);
         Project project = projectMapper.queryByName(projectName);
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+        ProcessDefinition processDefinition =
+                processDefinitionMapper.queryByDefineName(project.getCode(), workflowName);
 
-        // make sure process definition online
-        processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
+        // make sure workflow online
+        processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(),
+                ReleaseState.ONLINE);
 
         executorService.execProcessInstance(user,
                 project.getCode(),
@@ -375,8 +382,8 @@ public class PythonGateway {
 
     // side object
     /*
-      Grant project's permission to user. Use when project's created user not current but
-      Python API use it to change process definition.
+     * Grant project's permission to user. Use when project's created user not current but Python API use it to change
+     * workflow.
      */
     private Integer grantProjectToUser(Project project, User user) {
         Date now = new Date();
@@ -492,29 +499,31 @@ public class PythonGateway {
     }
 
     /**
-     * Get processDefinition by given processDefinitionName name. It return map contain processDefinition id, name, code.
-     * Useful in Python API create subProcess task which need processDefinition information.
+     * Get workflow object by given workflow name. It returns map contain workflow id, name, code.
+     * Useful in Python API create subProcess task which need workflow information.
      *
      * @param userName user who create or update schedule
-     * @param projectName project name which process definition belongs to
-     * @param processDefinitionName process definition name
+     * @param projectName project name which workflow belongs to
+     * @param workflowName workflow name
      */
-    public Map<String, Object> getProcessDefinitionInfo(String userName, String projectName, String processDefinitionName) {
+    public Map<String, Object> getWorkflowInfo(String userName, String projectName,
+                                               String workflowName) {
         Map<String, Object> result = new HashMap<>();
 
         User user = usersService.queryUser(userName);
         Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
         long projectCode = project.getCode();
-        ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, processDefinitionName);
-        // get process definition info
+        ProcessDefinition processDefinition = getWorkflow(user, projectCode, workflowName);
+        // get workflow info
         if (processDefinition != null) {
-            // make sure process definition online
-            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(), ReleaseState.ONLINE);
+            // make sure workflow online
+            processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(),
+                    ReleaseState.ONLINE);
             result.put("id", processDefinition.getId());
             result.put("name", processDefinition.getName());
             result.put("code", processDefinition.getCode());
         } else {
-            String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
+            String msg = String.format("Can not find valid workflow by name %s", workflowName);
             logger.error(msg);
             throw new IllegalArgumentException(msg);
         }
@@ -523,14 +532,14 @@ public class PythonGateway {
     }
 
     /**
-     * Get project, process definition, task code.
-     * Useful in Python API create dependent task which need processDefinition information.
+     * Get project, workflow, task code.
+     * Useful in Python API create dependent task which need workflow information.
      *
-     * @param projectName project name which process definition belongs to
-     * @param processDefinitionName process definition name
+     * @param projectName project name which workflow belongs to
+     * @param workflowName workflow name
      * @param taskName task name
      */
-    public Map<String, Object> getDependentInfo(String projectName, String processDefinitionName, String taskName) {
+    public Map<String, Object> getDependentInfo(String projectName, String workflowName, String taskName) {
         Map<String, Object> result = new HashMap<>();
 
         Project project = projectMapper.queryByName(projectName);
@@ -542,9 +551,10 @@ public class PythonGateway {
         long projectCode = project.getCode();
         result.put("projectCode", projectCode);
 
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
+        ProcessDefinition processDefinition =
+                processDefinitionMapper.queryByDefineName(projectCode, workflowName);
         if (processDefinition == null) {
-            String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
+            String msg = String.format("Can not find valid workflow by name %s", workflowName);
             logger.error(msg);
             throw new IllegalArgumentException(msg);
         }
@@ -558,8 +568,8 @@ public class PythonGateway {
     }
 
     /**
-     * Get resource by given program type and full name. It return map contain resource id, name.
-     * Useful in Python API create flink or spark task which need processDefinition information.
+     * Get resource by given program type and full name. It returns map contain resource id, name.
+     * Useful in Python API create flink or spark task which need workflow information.
      *
      * @param programType program type one of SCALA, JAVA and PYTHON
      * @param fullName full name of the resource
@@ -602,7 +612,7 @@ public class PythonGateway {
 
     /**
      * Get resource by given resource type and full name. It return map contain resource id, name.
-     * Useful in Python API create task which need processDefinition information.
+     * Useful in Python API create task which need workflow information.
      *
      * @param userName user who query resource
      * @param fullName full name of the resource


[dolphinscheduler] 03/05: [fix] Add token as authentication for python gateway (#12893)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 3.1.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 416c41465dd490bd0546ac79f2c993004482ffe0
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Mon Nov 14 18:43:08 2022 +0800

    [fix] Add token as authentication for python gateway (#12893)
    
    separate from #6407. Authentication, add secret to ensure only trusted people could connect to gateway.
    
    fix: #8255
    
    (cherry picked from commit 6d8befa0752c1e8005651c7b57b2301c7b9606fc)
---
 .../configuration/PythonGatewayConfiguration.java  | 68 +++-------------------
 .../dolphinscheduler/api/python/PythonGateway.java | 56 +++++++++---------
 .../src/main/resources/application.yaml            |  3 +
 .../src/main/resources/application.yaml            |  3 +
 4 files changed, 43 insertions(+), 87 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java
index 5735e27fd2..8a3a2e521c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java
@@ -17,13 +17,14 @@
 
 package org.apache.dolphinscheduler.api.configuration;
 
+import lombok.Data;
+
 import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.stereotype.Component;
+import org.springframework.context.annotation.Configuration;
 
-@Component
-@EnableConfigurationProperties
-@ConfigurationProperties(value = "python-gateway", ignoreUnknownFields = false)
+@Data
+@Configuration
+@ConfigurationProperties(value = "python-gateway")
 public class PythonGatewayConfiguration {
     private boolean enabled;
     private String gatewayServerAddress;
@@ -32,60 +33,5 @@ public class PythonGatewayConfiguration {
     private int pythonPort;
     private int connectTimeout;
     private int readTimeout;
-
-    public boolean getEnabled() {
-        return enabled;
-    }
-
-    public void setEnabled(boolean enabled) {
-        this.enabled = enabled;
-    }
-
-    public String getGatewayServerAddress() {
-        return gatewayServerAddress;
-    }
-
-    public void setGatewayServerAddress(String gatewayServerAddress) {
-        this.gatewayServerAddress = gatewayServerAddress;
-    }
-
-    public int getGatewayServerPort() {
-        return gatewayServerPort;
-    }
-
-    public void setGatewayServerPort(int gatewayServerPort) {
-        this.gatewayServerPort = gatewayServerPort;
-    }
-
-    public String getPythonAddress() {
-        return pythonAddress;
-    }
-
-    public void setPythonAddress(String pythonAddress) {
-        this.pythonAddress = pythonAddress;
-    }
-
-    public int getPythonPort() {
-        return pythonPort;
-    }
-
-    public void setPythonPort(int pythonPort) {
-        this.pythonPort = pythonPort;
-    }
-
-    public int getConnectTimeout() {
-        return connectTimeout;
-    }
-
-    public void setConnectTimeout(int connectTimeout) {
-        this.connectTimeout = connectTimeout;
-    }
-
-    public int getReadTimeout() {
-        return readTimeout;
-    }
-
-    public void setReadTimeout(int readTimeout) {
-        this.readTimeout = readTimeout;
-    }
+    private String authToken;
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index d9f0c78674..b79eaf307e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -17,18 +17,6 @@
 
 package org.apache.dolphinscheduler.api.python;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import javax.annotation.PostConstruct;
-
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration;
 import org.apache.dolphinscheduler.api.dto.EnvironmentDto;
 import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
@@ -72,6 +60,24 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import py4j.GatewayServer;
+import py4j.GatewayServer.GatewayServerBuilder;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -626,29 +632,27 @@ public class PythonGateway {
 
     @PostConstruct
     public void init() {
-        if (pythonGatewayConfiguration.getEnabled()) {
+        if (pythonGatewayConfiguration.isEnabled()) {
             this.start();
         }
     }
 
     private void start() {
-        GatewayServer server;
         try {
             InetAddress gatewayHost = InetAddress.getByName(pythonGatewayConfiguration.getGatewayServerAddress());
-            InetAddress pythonHost = InetAddress.getByName(pythonGatewayConfiguration.getPythonAddress());
-            server = new GatewayServer(
-                this,
-                pythonGatewayConfiguration.getGatewayServerPort(),
-                pythonGatewayConfiguration.getPythonPort(),
-                gatewayHost,
-                pythonHost,
-                pythonGatewayConfiguration.getConnectTimeout(),
-                pythonGatewayConfiguration.getReadTimeout(),
-                null
-            );
+            GatewayServerBuilder serverBuilder = new GatewayServer.GatewayServerBuilder()
+                    .entryPoint(this)
+                    .javaAddress(gatewayHost)
+                    .javaPort(pythonGatewayConfiguration.getGatewayServerPort())
+                    .connectTimeout(pythonGatewayConfiguration.getConnectTimeout())
+                    .readTimeout(pythonGatewayConfiguration.getReadTimeout());
+            if (!StringUtils.isEmpty(pythonGatewayConfiguration.getAuthToken())) {
+                serverBuilder.authToken(pythonGatewayConfiguration.getAuthToken());
+            }
+
             GatewayServer.turnLoggingOn();
             logger.info("PythonGatewayService started on: " + gatewayHost.toString());
-            server.start();
+            serverBuilder.build().start();
         } catch (UnknownHostException e) {
             logger.error("exception occurred while constructing PythonGatewayService().", e);
         }
diff --git a/dolphinscheduler-api/src/main/resources/application.yaml b/dolphinscheduler-api/src/main/resources/application.yaml
index 9a8381454b..2b51881e07 100644
--- a/dolphinscheduler-api/src/main/resources/application.yaml
+++ b/dolphinscheduler-api/src/main/resources/application.yaml
@@ -121,6 +121,9 @@ metrics:
 python-gateway:
   # Weather enable python gateway server or not. The default value is true.
   enabled: true
+  # Authentication token for connection from python api to python gateway server. Should be changed the default value
+  # when you deploy in public network.
+  auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
   # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
   # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
   gateway-server-address: 0.0.0.0
diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index a97d03e34c..66c8abd70d 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -187,6 +187,9 @@ alert:
 python-gateway:
   # Weather enable python gateway server or not. The default value is true.
   enabled: true
+  # Authentication token for connection from python api to python gateway server. Should be changed the default value
+  # when you deploy in public network.
+  auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
   # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
   # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
   gateway-server-address: 0.0.0.0