You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/08/08 08:25:33 UTC
[dolphinscheduler] branch dev updated: [Python] Combine gateway.entry_point call in python api side (#11330)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3061bbc5c4 [Python] Combine gateway.entry_point call in python api side (#11330)
3061bbc5c4 is described below
commit 3061bbc5c419ccecf9b41a7ac7ee66bbcfa7cc46
Author: 陈家名 <13...@163.com>
AuthorDate: Mon Aug 8 16:25:27 2022 +0800
[Python] Combine gateway.entry_point call in python api side (#11330)
---
.../src/pydolphinscheduler/core/database.py | 5 +-
.../src/pydolphinscheduler/core/engine.py | 5 +-
.../pydolphinscheduler/core/process_definition.py | 10 +-
.../src/pydolphinscheduler/core/resource.py | 10 +-
.../src/pydolphinscheduler/core/task.py | 5 +-
.../src/pydolphinscheduler/java_gateway.py | 156 +++++++++++++++++++++
.../src/pydolphinscheduler/models/project.py | 5 +-
.../src/pydolphinscheduler/models/queue.py | 8 --
.../src/pydolphinscheduler/models/tenant.py | 5 +-
.../src/pydolphinscheduler/models/user.py | 5 +-
.../src/pydolphinscheduler/tasks/dependent.py | 5 +-
.../src/pydolphinscheduler/tasks/sub_process.py | 5 +-
12 files changed, 180 insertions(+), 44 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py
index b6602a6483..4a93f22f3f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py
@@ -22,7 +22,7 @@ from typing import Dict
from py4j.protocol import Py4JJavaError
from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
class Database(dict):
@@ -54,9 +54,8 @@ class Database(dict):
if self._database:
return self._database
else:
- gateway = launch_gateway()
try:
- self._database = gateway.entry_point.getDatasourceInfo(name)
+ self._database = JavaGate().get_datasource_info(name)
# Handler database source do not exists error, for now we just terminate the process.
except Py4JJavaError as ex:
raise PyDSParamException(str(ex.java_exception))
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
index df84b5ba90..41021ed474 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
@@ -23,7 +23,7 @@ from py4j.protocol import Py4JJavaError
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
class ProgramType(str):
@@ -62,9 +62,8 @@ class Engine(Task):
if self._resource:
return self._resource
else:
- gateway = launch_gateway()
try:
- self._resource = gateway.entry_point.getResourcesFileInfo(
+ self._resource = JavaGate().get_resources_file_info(
program_type, main_package
)
# Handler source do not exists error, for now we just terminate the process.
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 1740beafdf..980b18e96d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -25,7 +25,7 @@ from pydolphinscheduler import configuration
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import Base, Project, Tenant, User
from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule
@@ -392,14 +392,12 @@ class ProcessDefinition(Base):
self._ensure_side_model_exists()
self._pre_submit_check()
- gateway = launch_gateway()
- self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition(
+ self._process_definition_code = JavaGate().create_or_update_process_definition(
self._user,
self._project,
self.name,
str(self.description) if self.description else "",
json.dumps(self.param_json),
- json.dumps(self.schedule_json) if self.schedule_json else None,
self.warning_type,
self.warning_group_id,
json.dumps(self.task_location),
@@ -410,6 +408,7 @@ class ProcessDefinition(Base):
# TODO add serialization function
json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json),
+ json.dumps(self.schedule_json) if self.schedule_json else None,
None,
None,
)
@@ -424,8 +423,7 @@ class ProcessDefinition(Base):
which post to `start-process-instance` to java gateway
"""
- gateway = launch_gateway()
- gateway.entry_point.execProcessInstance(
+ JavaGate().exec_process_instance(
self._user,
self._project,
self.name,
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
index 8015a72e39..ea811915e2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
@@ -20,7 +20,7 @@
from typing import Optional
from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import Base
@@ -53,8 +53,7 @@ class Resource(Base):
raise PyDSParamException(
"`user_name` is required when querying resources from python gate."
)
- gateway = launch_gateway()
- return gateway.entry_point.queryResourcesFileInfo(self.user_name, self.name)
+ return JavaGate().query_resources_file_info(self.user_name, self.name)
def get_id_from_database(self):
"""Get resource id from java gateway."""
@@ -66,10 +65,9 @@ class Resource(Base):
raise PyDSParamException(
"`user_name` and `content` are required when create or update resource from python gate."
)
- gateway = launch_gateway()
- gateway.entry_point.createOrUpdateResource(
+ JavaGate().create_or_update_resource(
self.user_name,
self.name,
- self.description,
self.content,
+ self.description,
)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index b866ea221f..93c5f28342 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -34,7 +34,7 @@ from pydolphinscheduler.core.process_definition import (
)
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import Base
logger = getLogger(__name__)
@@ -300,8 +300,7 @@ class Task(Base):
equal to 0 by java gateway, otherwise if will return the exists code and version.
"""
# TODO get code from specific project process definition and task name
- gateway = launch_gateway()
- result = gateway.entry_point.getCodeAndVersion(
+ result = JavaGate().get_code_and_version(
self.process_definition._project, self.process_definition.name, self.name
)
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
index 7b85902f9a..a460a466bb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -63,3 +63,159 @@ def gateway_result_checker(
):
raise PyDSJavaGatewayException("Get result state not success.")
return result
+
+
+class JavaGate:
+ """Launch java gateway to pydolphin scheduler."""
+
+ def __init__(
+ self,
+ address: Optional[str] = None,
+ port: Optional[int] = None,
+ auto_convert: Optional[bool] = True,
+ ):
+ self.java_gateway = launch_gateway(address, port, auto_convert)
+
+ def get_datasource_info(self, name: str):
+ """Get datasource info through java gateway."""
+ return self.java_gateway.entry_point.getDatasourceInfo(name)
+
+ def get_resources_file_info(self, program_type: str, main_package: str):
+ """Get resources file info through java gateway."""
+ return self.java_gateway.entry_point.getResourcesFileInfo(
+ program_type, main_package
+ )
+
+ def create_or_update_resource(
+ self, user_name: str, name: str, content: str, description: Optional[str] = None
+ ):
+ """Create or update resource through java gateway."""
+ return self.java_gateway.entry_point.createOrUpdateResource(
+ user_name, name, description, content
+ )
+
+ def query_resources_file_info(self, user_name: str, name: str):
+ """Get resources file info through java gateway."""
+ return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name)
+
+ def get_code_and_version(
+ self, project_name: str, process_definition_name: str, task_name: str
+ ):
+ """Get code and version through java gateway."""
+ return self.java_gateway.entry_point.getCodeAndVersion(
+ project_name, process_definition_name, task_name
+ )
+
+ def create_or_grant_project(
+ self, user: str, name: str, description: Optional[str] = None
+ ):
+ """Create or grant project through java gateway."""
+ return self.java_gateway.entry_point.createOrGrantProject(
+ user, name, description
+ )
+
+ def create_tenant(
+ self, tenant_name: str, queue_name: str, description: Optional[str] = None
+ ):
+ """Create tenant through java gateway."""
+ return self.java_gateway.entry_point.createTenant(
+ tenant_name, description, queue_name
+ )
+
+ def create_user(
+ self,
+ name: str,
+ password: str,
+ email: str,
+ phone: str,
+ tenant: str,
+ queue: str,
+ status: int,
+ ):
+ """Create user through java gateway."""
+ return self.java_gateway.entry_point.createUser(
+ name, password, email, phone, tenant, queue, status
+ )
+
+ def get_dependent_info(
+ self,
+ project_name: str,
+ process_definition_name: str,
+ task_name: Optional[str] = None,
+ ):
+ """Get dependent info through java gateway."""
+ return self.java_gateway.entry_point.getDependentInfo(
+ project_name, process_definition_name, task_name
+ )
+
+ def get_process_definition_info(
+ self, user_name: str, project_name: str, process_definition_name: str
+ ):
+ """Get process definition info through java gateway."""
+ return self.java_gateway.entry_point.getProcessDefinitionInfo(
+ user_name, project_name, process_definition_name
+ )
+
+ def create_or_update_process_definition(
+ self,
+ user_name: str,
+ project_name: str,
+ name: str,
+ description: str,
+ global_params: str,
+ warning_type: str,
+ warning_group_id: int,
+ locations: str,
+ timeout: int,
+ worker_group: str,
+ tenant_code: str,
+ release_state: int,
+ task_relation_json: str,
+ task_definition_json: str,
+ schedule: Optional[str] = None,
+ other_params_json: Optional[str] = None,
+ execution_type: Optional[str] = None,
+ ):
+ """Create or update process definition through java gateway."""
+ return self.java_gateway.entry_point.createOrUpdateProcessDefinition(
+ user_name,
+ project_name,
+ name,
+ description,
+ global_params,
+ schedule,
+ warning_type,
+ warning_group_id,
+ locations,
+ timeout,
+ worker_group,
+ tenant_code,
+ release_state,
+ task_relation_json,
+ task_definition_json,
+ other_params_json,
+ execution_type,
+ )
+
+ def exec_process_instance(
+ self,
+ user_name: str,
+ project_name: str,
+ process_definition_name: str,
+ cron_time: str,
+ worker_group: str,
+ warning_type: str,
+ warning_group_id: int,
+ timeout: int,
+ ):
+ """Exec process instance through java gateway."""
+ return self.java_gateway.entry_point.execProcessInstance(
+ user_name,
+ project_name,
+ process_definition_name,
+ cron_time,
+ worker_group,
+ warning_type,
+ warning_group_id,
+ timeout,
+ )
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py
index ad7221108a..bebdafd67e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/project.py
@@ -20,7 +20,7 @@
from typing import Optional
from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import BaseSide
@@ -36,7 +36,6 @@ class Project(BaseSide):
def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
"""Create Project if not exists."""
- gateway = launch_gateway()
- gateway.entry_point.createOrGrantProject(user, self.name, self.description)
+ JavaGate().create_or_grant_project(user, self.name, self.description)
# TODO recover result checker
# gateway_result_checker(result, None)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/queue.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/queue.py
index 3f8f81d126..e6da2594c8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/queue.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/queue.py
@@ -20,7 +20,6 @@
from typing import Optional
from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import gateway_result_checker, launch_gateway
from pydolphinscheduler.models import BaseSide
@@ -33,10 +32,3 @@ class Queue(BaseSide):
description: Optional[str] = "",
):
super().__init__(name, description)
-
- def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
- """Create Queue if not exists."""
- gateway = launch_gateway()
- # Here we set Queue.name and Queue.queueName same as self.name
- result = gateway.entry_point.createProject(user, self.name, self.name)
- gateway_result_checker(result, None)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py
index 148a8f6521..6641d9aef7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/tenant.py
@@ -20,7 +20,7 @@
from typing import Optional
from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import BaseSide
@@ -40,6 +40,5 @@ class Tenant(BaseSide):
self, queue_name: str, user=configuration.USER_NAME
) -> None:
"""Create Tenant if not exists."""
- gateway = launch_gateway()
- gateway.entry_point.createTenant(self.name, self.description, queue_name)
+ JavaGate().create_tenant(self.name, queue_name, self.description)
# gateway_result_checker(result, None)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py
index de2d8b159a..e11bb9ca0d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/models/user.py
@@ -20,7 +20,7 @@
from typing import Optional
from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import BaseSide, Tenant
@@ -64,8 +64,7 @@ class User(BaseSide):
"""Create User if not exists."""
# Should make sure queue already exists.
self.create_tenant_if_not_exists()
- gateway = launch_gateway()
- gateway.entry_point.createUser(
+ JavaGate().create_user(
self.name,
self.password,
self.email,
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
index cc6d25b0a2..12ec6ba91d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
@@ -22,7 +22,7 @@ from typing import Dict, Optional, Tuple
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSJavaGatewayException, PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models.base import Base
DEPENDENT_ALL_TASK_IN_WORKFLOW = "0"
@@ -165,9 +165,8 @@ class DependentItem(Base):
if self._code:
return self._code
else:
- gateway = launch_gateway()
try:
- self._code = gateway.entry_point.getDependentInfo(*self.code_parameter)
+ self._code = JavaGate().get_dependent_info(*self.code_parameter)
return self._code
except Exception:
raise PyDSJavaGatewayException("Function get_code_from_gateway error.")
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
index 8ba6b4c64d..c7a9f8bd11 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
@@ -22,7 +22,7 @@ from typing import Dict
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
class SubProcess(Task):
@@ -47,8 +47,7 @@ class SubProcess(Task):
raise PyDSProcessDefinitionNotAssignException(
"ProcessDefinition must be provider for task SubProcess."
)
- gateway = launch_gateway()
- return gateway.entry_point.getProcessDefinitionInfo(
+ return JavaGate().get_process_definition_info(
self.process_definition.user.name,
self.process_definition.project.name,
process_definition_name,