You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/12/13 11:55:00 UTC
[dolphinscheduler] branch dev updated: [python] Add task base database and procedure (#7279)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1948030 [python] Add task base database and procedure (#7279)
1948030 is described below
commit 1948030151173c72c81c45346e37bba6dc35d44b
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Mon Dec 13 19:54:24 2021 +0800
[python] Add task base database and procedure (#7279)
We add a new task procedure, and add parent class database
for both sql task and procedure task
fix: #6929
---
.../src/pydolphinscheduler/constants.py | 1 +
.../tasks/{sql.py => database.py} | 55 ++-------
.../src/pydolphinscheduler/tasks/procedure.py | 43 +++++++
.../src/pydolphinscheduler/tasks/sql.py | 44 +------
.../tests/tasks/test_database.py | 127 +++++++++++++++++++++
.../tests/tasks/{test_sql.py => test_procedure.py} | 107 +++++------------
.../pydolphinscheduler/tests/tasks/test_sql.py | 2 +-
7 files changed, 216 insertions(+), 163 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 7203459..c2899ab 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -72,6 +72,7 @@ class TaskType(str):
PYTHON = "PYTHON"
SQL = "SQL"
SUB_PROCESS = "SUB_PROCESS"
+ PROCEDURE = "PROCEDURE"
class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py
similarity index 64%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
copy to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py
index f16eb10..686f57e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py
@@ -15,27 +15,19 @@
# specific language governing permissions and limitations
# under the License.
-"""Task sql."""
+"""Task database base task."""
-import re
-from typing import Dict, Optional
+from typing import Dict
-from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.java_gateway import launch_gateway
-class SqlType:
- """SQL type, for now it just contain `SELECT` and `NO_SELECT`."""
+class Database(Task):
+ """Base task to handle database, declare behavior for the base handler of database.
- SELECT = 0
- NOT_SELECT = 1
-
-
-class Sql(Task):
- """Task SQL object, declare behavior for SQL task to dolphinscheduler.
-
- It should run sql job in multiply sql lik engine, such as:
+ It a parent class for all database task of dolphinscheduler. And it should run sql like
+ job in multiply sql lik engine, such as:
- ClickHouse
- DB2
- HIVE
@@ -48,31 +40,14 @@ class Sql(Task):
database type and database instance would run this sql.
"""
- _task_custom_attr = {
- "sql",
- "sql_type",
- "pre_statements",
- "post_statements",
- "display_rows",
- }
+ _task_custom_attr = {"sql"}
def __init__(
- self,
- name: str,
- datasource_name: str,
- sql: str,
- pre_statements: Optional[str] = None,
- post_statements: Optional[str] = None,
- display_rows: Optional[int] = 10,
- *args,
- **kwargs
+ self, task_type: str, name: str, datasource_name: str, sql: str, *args, **kwargs
):
- super().__init__(name, TaskType.SQL, *args, **kwargs)
+ super().__init__(name, task_type, *args, **kwargs)
self.datasource_name = datasource_name
self.sql = sql
- self.pre_statements = pre_statements or []
- self.post_statements = post_statements or []
- self.display_rows = display_rows
self._datasource = {}
def get_datasource_type(self) -> str:
@@ -93,18 +68,6 @@ class Sql(Task):
return self._datasource
@property
- def sql_type(self) -> int:
- """Judgement sql type, use regexp to check which type of the sql is."""
- pattern_select_str = (
- "^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
- )
- pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
- if pattern_select.match(self.sql) is None:
- return SqlType.NOT_SELECT
- else:
- return SqlType.SELECT
-
- @property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
"""Override Task.task_params for sql task.
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py
new file mode 100644
index 0000000..f9497be
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task procedure."""
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.tasks.database import Database
+
+
+class Procedure(Database):
+ """Task Procedure object, declare behavior for Procedure task to dolphinscheduler.
+
+ It should run database procedure job in multiply sql lik engine, such as:
+ - ClickHouse
+ - DB2
+ - HIVE
+ - MySQL
+ - Oracle
+ - Postgresql
+ - Presto
+ - SQLServer
+ You provider datasource_name contain connection information, it decisions which
+ database type and database instance would run this sql.
+ """
+
+ def __init__(self, name: str, datasource_name: str, sql: str, *args, **kwargs):
+ super().__init__(
+ TaskType.PROCEDURE, name, datasource_name, sql, *args, **kwargs
+ )
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
index f16eb10..b6ee745 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
@@ -18,11 +18,10 @@
"""Task sql."""
import re
-from typing import Dict, Optional
+from typing import Optional
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.tasks.database import Database
class SqlType:
@@ -32,7 +31,7 @@ class SqlType:
NOT_SELECT = 1
-class Sql(Task):
+class Sql(Database):
"""Task SQL object, declare behavior for SQL task to dolphinscheduler.
It should run sql job in multiply sql lik engine, such as:
@@ -67,30 +66,10 @@ class Sql(Task):
*args,
**kwargs
):
- super().__init__(name, TaskType.SQL, *args, **kwargs)
- self.datasource_name = datasource_name
- self.sql = sql
+ super().__init__(TaskType.SQL, name, datasource_name, sql, *args, **kwargs)
self.pre_statements = pre_statements or []
self.post_statements = post_statements or []
self.display_rows = display_rows
- self._datasource = {}
-
- def get_datasource_type(self) -> str:
- """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
- return self.get_datasource_info(self.datasource_name).get("type")
-
- def get_datasource_id(self) -> str:
- """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
- return self.get_datasource_info(self.datasource_name).get("id")
-
- def get_datasource_info(self, name) -> Dict:
- """Get datasource info from java gateway, contains datasource id, type, name."""
- if self._datasource:
- return self._datasource
- else:
- gateway = launch_gateway()
- self._datasource = gateway.entry_point.getDatasourceInfo(name)
- return self._datasource
@property
def sql_type(self) -> int:
@@ -103,18 +82,3 @@ class Sql(Task):
return SqlType.NOT_SELECT
else:
return SqlType.SELECT
-
- @property
- def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
- """Override Task.task_params for sql task.
-
- Sql task have some specials attribute for task_params, and is odd if we
- directly set as python property, so we Override Task.task_params here.
- """
- params = super().task_params
- custom_params = {
- "type": self.get_datasource_type(),
- "datasource": self.get_datasource_id(),
- }
- params.update(custom_params)
- return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py
new file mode 100644
index 0000000..dd510a8
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Database."""
+
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.database import Database
+
+TEST_DATABASE_TASK_TYPE = "SQL"
+TEST_DATABASE_SQL = "select 1"
+TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.tasks.database.Database.get_datasource_info",
+ return_value=({"id": 1, "type": "mock_type"}),
+)
+def test_get_datasource_detail(mock_datasource, mock_code_version):
+ """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
+ name = "test_get_database_detail"
+ task = Database(
+ TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL
+ )
+ assert 1 == task.get_datasource_id()
+ assert "mock_type" == task.get_datasource_type()
+
+
+@pytest.mark.parametrize(
+ "attr, expect",
+ [
+ (
+ {
+ "task_type": TEST_DATABASE_TASK_TYPE,
+ "name": "test-task-params",
+ "datasource_name": TEST_DATABASE_DATASOURCE_NAME,
+ "sql": TEST_DATABASE_SQL,
+ },
+ {
+ "type": "MYSQL",
+ "datasource": 1,
+ "sql": TEST_DATABASE_SQL,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ )
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.tasks.database.Database.get_datasource_info",
+ return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
+ """Test task database task property."""
+ task = Database(**attr)
+ assert expect == task.task_params
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.tasks.database.Database.get_datasource_info",
+ return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_database_get_define(mock_datasource, mock_code_version):
+ """Test task database function get_define."""
+ name = "test_database_get_define"
+ expect = {
+ "code": 123,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": TEST_DATABASE_TASK_TYPE,
+ "taskParams": {
+ "type": "MYSQL",
+ "datasource": 1,
+ "sql": TEST_DATABASE_SQL,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ }
+ task = Database(
+ TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL
+ )
+ assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
similarity index 50%
copy from dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
copy to dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
index 2590100..42c38e1 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
@@ -15,14 +15,18 @@
# specific language governing permissions and limitations
# under the License.
-"""Test Task Sql."""
-
+"""Test Task Procedure."""
from unittest.mock import patch
import pytest
-from pydolphinscheduler.tasks.sql import Sql, SqlType
+from pydolphinscheduler.tasks.procedure import Procedure
+
+TEST_PROCEDURE_SQL = (
+ 'create procedure HelloWorld() selece "hello world"; call HelloWorld();'
+)
+TEST_PROCEDURE_DATASOURCE_NAME = "test_datasource"
@patch(
@@ -30,71 +34,30 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
return_value=(123, 1),
)
@patch(
- "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+ "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
return_value=({"id": 1, "type": "mock_type"}),
)
def test_get_datasource_detail(mock_datasource, mock_code_version):
"""Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
- name = "test_get_sql_type"
- datasource_name = "test_datasource"
- sql = "select 1"
- task = Sql(name, datasource_name, sql)
+ name = "test_get_datasource_detail"
+ task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL)
assert 1 == task.get_datasource_id()
assert "mock_type" == task.get_datasource_type()
@pytest.mark.parametrize(
- "sql, sql_type",
- [
- ("select 1", SqlType.SELECT),
- (" select 1", SqlType.SELECT),
- (" select 1 ", SqlType.SELECT),
- (" select 'insert' ", SqlType.SELECT),
- (" select 'insert ' ", SqlType.SELECT),
- ("with tmp as (select 1) select * from tmp ", SqlType.SELECT),
- ("insert into table_name(col1, col2) value (val1, val2)", SqlType.NOT_SELECT),
- (
- "insert into table_name(select, col2) value ('select', val2)",
- SqlType.NOT_SELECT,
- ),
- ("update table_name SET col1=val1 where col1=val2", SqlType.NOT_SELECT),
- ("update table_name SET col1='select' where col1=val2", SqlType.NOT_SELECT),
- ("delete from table_name where id < 10", SqlType.NOT_SELECT),
- ("delete from table_name where id < 10", SqlType.NOT_SELECT),
- ("alter table table_name add column col1 int", SqlType.NOT_SELECT),
- ],
-)
-@patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version",
- return_value=(123, 1),
-)
-@patch(
- "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
- return_value=({"id": 1, "type": "mock_type"}),
-)
-def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
- """Test property sql_type could return correct type."""
- name = "test_get_sql_type"
- datasource_name = "test_datasource"
- task = Sql(name, datasource_name, sql)
- assert (
- sql_type == task.sql_type
- ), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"
-
-
-@pytest.mark.parametrize(
"attr, expect",
[
(
- {"datasource_name": "datasource_name", "sql": "select 1"},
{
- "sql": "select 1",
+ "name": "test-procedure-task-params",
+ "datasource_name": TEST_PROCEDURE_DATASOURCE_NAME,
+ "sql": TEST_PROCEDURE_SQL,
+ },
+ {
+ "sql": TEST_PROCEDURE_SQL,
"type": "MYSQL",
"datasource": 1,
- "sqlType": SqlType.SELECT,
- "preStatements": [],
- "postStatements": [],
- "displayRows": 10,
"localParams": [],
"resourceList": [],
"dependence": {},
@@ -109,41 +72,37 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
return_value=(123, 1),
)
@patch(
- "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+ "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
return_value=({"id": 1, "type": "MYSQL"}),
)
def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
"""Test task sql task property."""
- task = Sql("test-sql-task-params", **attr)
+ task = Procedure(**attr)
assert expect == task.task_params
@patch(
- "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
return_value=({"id": 1, "type": "MYSQL"}),
)
-def test_sql_get_define(mock_datasource):
- """Test task sql function get_define."""
- code = 123
- version = 1
- name = "test_sql_dict"
- command = "select 1"
- datasource_name = "test_datasource"
+def test_sql_get_define(mock_datasource, mock_code_version):
+ """Test task procedure function get_define."""
+ name = "test_procedure_get_define"
expect = {
- "code": code,
+ "code": 123,
"name": name,
"version": 1,
"description": None,
"delayTime": 0,
- "taskType": "SQL",
+ "taskType": "PROCEDURE",
"taskParams": {
"type": "MYSQL",
"datasource": 1,
- "sql": command,
- "sqlType": SqlType.SELECT,
- "displayRows": 10,
- "preStatements": [],
- "postStatements": [],
+ "sql": TEST_PROCEDURE_SQL,
"localParams": [],
"resourceList": [],
"dependence": {},
@@ -159,9 +118,5 @@ def test_sql_get_define(mock_datasource):
"timeoutNotifyStrategy": None,
"timeout": 0,
}
- with patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version",
- return_value=(code, version),
- ):
- task = Sql(name, datasource_name, command)
- assert task.get_define() == expect
+ task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL)
+ assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
index 2590100..6058cce 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
@@ -126,7 +126,7 @@ def test_sql_get_define(mock_datasource):
"""Test task sql function get_define."""
code = 123
version = 1
- name = "test_sql_dict"
+ name = "test_sql_get_define"
command = "select 1"
datasource_name = "test_datasource"
expect = {