You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/08/08 08:25:33 UTC

[dolphinscheduler] branch dev updated: [Python] Combine gateway.entry_point call in python api side (#11330)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3061bbc5c4 [Python] Combine gateway.entry_point call in python api side (#11330)
3061bbc5c4 is described below

commit 3061bbc5c419ccecf9b41a7ac7ee66bbcfa7cc46
Author: 陈家名 <13...@163.com>
AuthorDate: Mon Aug 8 16:25:27 2022 +0800

    [Python] Combine gateway.entry_point call in python api side (#11330)
---
 .../src/pydolphinscheduler/core/database.py        |   5 +-
 .../src/pydolphinscheduler/core/engine.py          |   5 +-
 .../pydolphinscheduler/core/process_definition.py  |  10 +-
 .../src/pydolphinscheduler/core/resource.py        |  10 +-
 .../src/pydolphinscheduler/core/task.py            |   5 +-
 .../src/pydolphinscheduler/java_gateway.py         | 156 +++++++++++++++++++++
 .../src/pydolphinscheduler/models/project.py       |   5 +-
 .../src/pydolphinscheduler/models/queue.py         |   8 --
 .../src/pydolphinscheduler/models/tenant.py        |   5 +-
 .../src/pydolphinscheduler/models/user.py          |   5 +-
 .../src/pydolphinscheduler/tasks/dependent.py      |   5 +-
 .../src/pydolphinscheduler/tasks/sub_process.py    |   5 +-
 12 files changed, 180 insertions(+), 44 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py
index b6602a6483..4a93f22f3f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py
@@ -22,7 +22,7 @@ from typing import Dict
 from py4j.protocol import Py4JJavaError
 
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 
 
 class Database(dict):
@@ -54,9 +54,8 @@ class Database(dict):
         if self._database:
             return self._database
         else:
-            gateway = launch_gateway()
             try:
-                self._database = gateway.entry_point.getDatasourceInfo(name)
+                self._database = JavaGate().get_datasource_info(name)
             # Handler database source do not exists error, for now we just terminate the process.
             except Py4JJavaError as ex:
                 raise PyDSParamException(str(ex.java_exception))
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
index df84b5ba90..41021ed474 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
@@ -23,7 +23,7 @@ from py4j.protocol import Py4JJavaError
 
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 
 
 class ProgramType(str):
@@ -62,9 +62,8 @@ class Engine(Task):
         if self._resource:
             return self._resource
         else:
-            gateway = launch_gateway()
             try:
-                self._resource = gateway.entry_point.getResourcesFileInfo(
+                self._resource = JavaGate().get_resources_file_info(
                     program_type, main_package
                 )
             # Handler source do not exists error, for now we just terminate the process.
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 1740beafdf..980b18e96d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -25,7 +25,7 @@ from pydolphinscheduler import configuration
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.resource import Resource
 from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base, Project, Tenant, User
 from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule
 
@@ -392,14 +392,12 @@ class ProcessDefinition(Base):
         self._ensure_side_model_exists()
         self._pre_submit_check()
 
-        gateway = launch_gateway()
-        self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition(
+        self._process_definition_code = JavaGate().create_or_update_process_definition(
             self._user,
             self._project,
             self.name,
             str(self.description) if self.description else "",
             json.dumps(self.param_json),
-            json.dumps(self.schedule_json) if self.schedule_json else None,
             self.warning_type,
             self.warning_group_id,
             json.dumps(self.task_location),
@@ -410,6 +408,7 @@ class ProcessDefinition(Base):
             # TODO add serialization function
             json.dumps(self.task_relation_json),
             json.dumps(self.task_definition_json),
+            json.dumps(self.schedule_json) if self.schedule_json else None,
             None,
             None,
         )
@@ -424,8 +423,7 @@ class ProcessDefinition(Base):
 
         which post to `start-process-instance` to java gateway
         """
-        gateway = launch_gateway()
-        gateway.entry_point.execProcessInstance(
+        JavaGate().exec_process_instance(
             self._user,
             self._project,
             self.name,
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
index 8015a72e39..ea811915e2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base
 
 
@@ -53,8 +53,7 @@ class Resource(Base):
             raise PyDSParamException(
                 "`user_name` is required when querying resources from python gate."
             )
-        gateway = launch_gateway()
-        return gateway.entry_point.queryResourcesFileInfo(self.user_name, self.name)
+        return JavaGate().query_resources_file_info(self.user_name, self.name)
 
     def get_id_from_database(self):
         """Get resource id from java gateway."""
@@ -66,10 +65,9 @@ class Resource(Base):
             raise PyDSParamException(
                 "`user_name` and `content` are required when create or update resource from python gate."
             )
-        gateway = launch_gateway()
-        gateway.entry_point.createOrUpdateResource(
+        JavaGate().create_or_update_resource(
             self.user_name,
             self.name,
-            self.description,
             self.content,
+            self.description,
         )
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index b866ea221f..93c5f28342 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -34,7 +34,7 @@ from pydolphinscheduler.core.process_definition import (
 )
 from pydolphinscheduler.core.resource import Resource
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base
 
 logger = getLogger(__name__)
@@ -300,8 +300,7 @@ class Task(Base):
         equal to 0 by java gateway, otherwise if will return the exists code and version.
         """
         # TODO get code from specific project process definition and task name
-        gateway = launch_gateway()
-        result = gateway.entry_point.getCodeAndVersion(
+        result = JavaGate().get_code_and_version(
             self.process_definition._project, self.process_definition.name, self.name
         )
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
index 7b85902f9a..a460a466bb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -63,3 +63,159 @@ def gateway_result_checker(
     ):
         raise PyDSJavaGatewayException("Get result state not success.")
     return result
+
+
+class JavaGate:
+    """Launch java gateway to pydolphin scheduler."""
+
+    def __init__(
+        self,
+        address: Optional[str] = None,
+        port: Optional[int] = None,
+        auto_convert: Optional[bool] = True,
+    ):
+        self.java_gateway = launch_gateway(address, port, auto_convert)
+
+    def get_datasource_info(self, name: str):
+        """Get datasource info through java gateway."""
+        return self.java_gateway.entry_point.getDatasourceInfo(name)
+
+    def get_resources_file_info(self, program_type: str, main_package: str):
+        """Get resources file info through java gateway."""
+        return self.java_gateway.entry_point.getResourcesFileInfo(
+            program_type, main_package
+        )
+
+    def create_or_update_resource(
+        self, user_name: str, name: str, content: str, description: Optional[str] = None
+    ):
+        """Create or update resource through java gateway."""
+        return self.java_gateway.entry_point.createOrUpdateResource(
+            user_name, name, description, content
+        )
+
+    def query_resources_file_info(self, user_name: str, name: str):
+        """Get resources file info through java gateway."""
+        return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name)
+
+    def get_code_and_version(
+        self, project_name: str, process_definition_name: str, task_name: str
+    ):
+        """Get code and version through java gateway."""
+        return self.java_gateway.entry_point.getCodeAndVersion(
+            project_name, process_definition_name, task_name
+        )
+
+    def create_or_grant_project(
+        self, user: str, name: str, description: Optional[str] = None
+    ):
+        """Create or grant project through java gateway."""
+        return self.java_gateway.entry_point.createOrGrantProject(
+            user, name, description
+        )
+
+    def create_tenant(
+        self, tenant_name: str, queue_name: str, description: Optional[str] = None
+    ):
+        """Create tenant through java gateway."""
+        return self.java_gateway.entry_point.createTenant(
+            tenant_name, description, queue_name
+        )
+
+    def create_user(
+        self,
+        name: str,
+        password: str,
+        email: str,
+        phone: str,
+        tenant: str,
+        queue: str,
+        status: int,
+    ):
+        """Create user through java gateway."""
+        return self.java_gateway.entry_point.createUser(
+            name, password, email, phone, tenant, queue, status
+        )
+
+    def get_dependent_info(
+        self,
+        project_name: str,
+        process_definition_name: str,
+        task_name: Optional[str] = None,
+    ):
+        """Get dependent info through java gateway."""
+        return self.java_gateway.entry_point.getDependentInfo(
+            project_name, process_definition_name, task_name
+        )
+
+    def get_process_definition_info(
+        self, user_name: str, project_name: str, process_definition_name: str
+    ):
+        """Get process definition info through java gateway."""
+        return self.java_gateway.entry_point.getProcessDefinitionInfo(
+            user_name, project_name, process_definition_name
+        )
+
+    def create_or_update_process_definition(
+        self,
+        user_name: str,
+        project_name: str,
+        name: str,
+        description: str,
+        global_params: str,
+        warning_type: str,
+        warning_group_id: int,
+        locations: str,
+        timeout: int,
+        worker_group: str,
+        tenant_code: str,
+        release_state: int,
+        task_relation_json: str,
+        task_definition_json: str,
+        schedule: Optional[str] = None,
+        other_params_json: Optional[str] = None,
+        execution_type: Optional[str] = None,
+    ):
+        """Create or update process definition through java gateway."""
+        return self.java_gateway.entry_point.createOrUpdateProcessDefinition(
+            user_name,
+            project_name,
+            name,
+            description,
+            global_params,
+            schedule,
+            warning_type,
+            warning_group_id,
+            locations,
+            timeout,
+            worker_group,
+            tenant_code,
+            release_state,
+            task_relation_json,
+            task_definition_json,
+            other_params_json,
+            execution_type,
+        )
+
+    def exec_process_instance(
+        self,
+        user_name: str,
+        project_name: str,
+        process_definition_name: str,
+        cron_time: str,
+        worker_group: str,
+        warning_type: str,
+        warning_group_id: int,
+        timeout: int,
+    ):
+        """Exec process instance through java gateway."""
+        return self.java_gateway.entry_point.execProcessInstance(
+            user_name,
+            project_name,
+            process_definition_name,
+            cron_time,
+            worker_group,
+            warning_type,
+            warning_group_id,
+            timeout,
+        )
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py
index ad7221108a..bebdafd67e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py
@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import BaseSide
 
 
@@ -36,7 +36,6 @@ class Project(BaseSide):
 
     def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
         """Create Project if not exists."""
-        gateway = launch_gateway()
-        gateway.entry_point.createOrGrantProject(user, self.name, self.description)
+        JavaGate().create_or_grant_project(user, self.name, self.description)
         # TODO recover result checker
         # gateway_result_checker(result, None)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/queue.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/queue.py
index 3f8f81d126..e6da2594c8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/queue.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/queue.py
@@ -20,7 +20,6 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import gateway_result_checker, launch_gateway
 from pydolphinscheduler.models import BaseSide
 
 
@@ -33,10 +32,3 @@ class Queue(BaseSide):
         description: Optional[str] = "",
     ):
         super().__init__(name, description)
-
-    def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
-        """Create Queue if not exists."""
-        gateway = launch_gateway()
-        # Here we set Queue.name and Queue.queueName same as self.name
-        result = gateway.entry_point.createProject(user, self.name, self.name)
-        gateway_result_checker(result, None)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py
index 148a8f6521..6641d9aef7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py
@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import BaseSide
 
 
@@ -40,6 +40,5 @@ class Tenant(BaseSide):
         self, queue_name: str, user=configuration.USER_NAME
     ) -> None:
         """Create Tenant if not exists."""
-        gateway = launch_gateway()
-        gateway.entry_point.createTenant(self.name, self.description, queue_name)
+        JavaGate().create_tenant(self.name, queue_name, self.description)
         # gateway_result_checker(result, None)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py
index de2d8b159a..e11bb9ca0d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py
@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import BaseSide, Tenant
 
 
@@ -64,8 +64,7 @@ class User(BaseSide):
         """Create User if not exists."""
         # Should make sure queue already exists.
         self.create_tenant_if_not_exists()
-        gateway = launch_gateway()
-        gateway.entry_point.createUser(
+        JavaGate().create_user(
             self.name,
             self.password,
             self.email,
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
index cc6d25b0a2..12ec6ba91d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
@@ -22,7 +22,7 @@ from typing import Dict, Optional, Tuple
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSJavaGatewayException, PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models.base import Base
 
 DEPENDENT_ALL_TASK_IN_WORKFLOW = "0"
@@ -165,9 +165,8 @@ class DependentItem(Base):
         if self._code:
             return self._code
         else:
-            gateway = launch_gateway()
             try:
-                self._code = gateway.entry_point.getDependentInfo(*self.code_parameter)
+                self._code = JavaGate().get_dependent_info(*self.code_parameter)
                 return self._code
             except Exception:
                 raise PyDSJavaGatewayException("Function get_code_from_gateway error.")
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
index 8ba6b4c64d..c7a9f8bd11 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
@@ -22,7 +22,7 @@ from typing import Dict
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 
 
 class SubProcess(Task):
@@ -47,8 +47,7 @@ class SubProcess(Task):
             raise PyDSProcessDefinitionNotAssignException(
                 "ProcessDefinition must be provider for task SubProcess."
             )
-        gateway = launch_gateway()
-        return gateway.entry_point.getProcessDefinitionInfo(
+        return JavaGate().get_process_definition_info(
             self.process_definition.user.name,
             self.process_definition.project.name,
             process_definition_name,