You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/12/13 11:55:00 UTC

[dolphinscheduler] branch dev updated: [python] Add task base database and procedure (#7279)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1948030  [python] Add task base database and procedure (#7279)
1948030 is described below

commit 1948030151173c72c81c45346e37bba6dc35d44b
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Mon Dec 13 19:54:24 2021 +0800

    [python] Add task base database and procedure (#7279)
    
    We add a new task procedure, and add parent class database
    for both sql task and procedure task
    
    fix: #6929
---
 .../src/pydolphinscheduler/constants.py            |   1 +
 .../tasks/{sql.py => database.py}                  |  55 ++-------
 .../src/pydolphinscheduler/tasks/procedure.py      |  43 +++++++
 .../src/pydolphinscheduler/tasks/sql.py            |  44 +------
 .../tests/tasks/test_database.py                   | 127 +++++++++++++++++++++
 .../tests/tasks/{test_sql.py => test_procedure.py} | 107 +++++------------
 .../pydolphinscheduler/tests/tasks/test_sql.py     |   2 +-
 7 files changed, 216 insertions(+), 163 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 7203459..c2899ab 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -72,6 +72,7 @@ class TaskType(str):
     PYTHON = "PYTHON"
     SQL = "SQL"
     SUB_PROCESS = "SUB_PROCESS"
+    PROCEDURE = "PROCEDURE"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py
similarity index 64%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
copy to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py
index f16eb10..686f57e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py
@@ -15,27 +15,19 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Task sql."""
+"""Task database base task."""
 
-import re
-from typing import Dict, Optional
+from typing import Dict
 
-from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.java_gateway import launch_gateway
 
 
-class SqlType:
-    """SQL type, for now it just contain `SELECT` and `NO_SELECT`."""
+class Database(Task):
+    """Base task to handle database, declare behavior for the base handler of database.
 
-    SELECT = 0
-    NOT_SELECT = 1
-
-
-class Sql(Task):
-    """Task SQL object, declare behavior for SQL task to dolphinscheduler.
-
-    It should run sql job in multiply sql lik engine, such as:
+    It a parent class for all database task of dolphinscheduler. And it should run sql like
+    job in multiply sql lik engine, such as:
     - ClickHouse
     - DB2
     - HIVE
@@ -48,31 +40,14 @@ class Sql(Task):
     database type and database instance would run this sql.
     """
 
-    _task_custom_attr = {
-        "sql",
-        "sql_type",
-        "pre_statements",
-        "post_statements",
-        "display_rows",
-    }
+    _task_custom_attr = {"sql"}
 
     def __init__(
-        self,
-        name: str,
-        datasource_name: str,
-        sql: str,
-        pre_statements: Optional[str] = None,
-        post_statements: Optional[str] = None,
-        display_rows: Optional[int] = 10,
-        *args,
-        **kwargs
+        self, task_type: str, name: str, datasource_name: str, sql: str, *args, **kwargs
     ):
-        super().__init__(name, TaskType.SQL, *args, **kwargs)
+        super().__init__(name, task_type, *args, **kwargs)
         self.datasource_name = datasource_name
         self.sql = sql
-        self.pre_statements = pre_statements or []
-        self.post_statements = post_statements or []
-        self.display_rows = display_rows
         self._datasource = {}
 
     def get_datasource_type(self) -> str:
@@ -93,18 +68,6 @@ class Sql(Task):
             return self._datasource
 
     @property
-    def sql_type(self) -> int:
-        """Judgement sql type, use regexp to check which type of the sql is."""
-        pattern_select_str = (
-            "^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
-        )
-        pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
-        if pattern_select.match(self.sql) is None:
-            return SqlType.NOT_SELECT
-        else:
-            return SqlType.SELECT
-
-    @property
     def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
         """Override Task.task_params for sql task.
 
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py
new file mode 100644
index 0000000..f9497be
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task procedure."""
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.tasks.database import Database
+
+
+class Procedure(Database):
+    """Task Procedure object, declare behavior for Procedure task to dolphinscheduler.
+
+    It should run database procedure job in multiply sql lik engine, such as:
+    - ClickHouse
+    - DB2
+    - HIVE
+    - MySQL
+    - Oracle
+    - Postgresql
+    - Presto
+    - SQLServer
+    You provider datasource_name contain connection information, it decisions which
+    database type and database instance would run this sql.
+    """
+
+    def __init__(self, name: str, datasource_name: str, sql: str, *args, **kwargs):
+        super().__init__(
+            TaskType.PROCEDURE, name, datasource_name, sql, *args, **kwargs
+        )
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
index f16eb10..b6ee745 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
@@ -18,11 +18,10 @@
 """Task sql."""
 
 import re
-from typing import Dict, Optional
+from typing import Optional
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.tasks.database import Database
 
 
 class SqlType:
@@ -32,7 +31,7 @@ class SqlType:
     NOT_SELECT = 1
 
 
-class Sql(Task):
+class Sql(Database):
     """Task SQL object, declare behavior for SQL task to dolphinscheduler.
 
     It should run sql job in multiply sql lik engine, such as:
@@ -67,30 +66,10 @@ class Sql(Task):
         *args,
         **kwargs
     ):
-        super().__init__(name, TaskType.SQL, *args, **kwargs)
-        self.datasource_name = datasource_name
-        self.sql = sql
+        super().__init__(TaskType.SQL, name, datasource_name, sql, *args, **kwargs)
         self.pre_statements = pre_statements or []
         self.post_statements = post_statements or []
         self.display_rows = display_rows
-        self._datasource = {}
-
-    def get_datasource_type(self) -> str:
-        """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
-        return self.get_datasource_info(self.datasource_name).get("type")
-
-    def get_datasource_id(self) -> str:
-        """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
-        return self.get_datasource_info(self.datasource_name).get("id")
-
-    def get_datasource_info(self, name) -> Dict:
-        """Get datasource info from java gateway, contains datasource id, type, name."""
-        if self._datasource:
-            return self._datasource
-        else:
-            gateway = launch_gateway()
-            self._datasource = gateway.entry_point.getDatasourceInfo(name)
-            return self._datasource
 
     @property
     def sql_type(self) -> int:
@@ -103,18 +82,3 @@ class Sql(Task):
             return SqlType.NOT_SELECT
         else:
             return SqlType.SELECT
-
-    @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
-        """Override Task.task_params for sql task.
-
-        Sql task have some specials attribute for task_params, and is odd if we
-        directly set as python property, so we Override Task.task_params here.
-        """
-        params = super().task_params
-        custom_params = {
-            "type": self.get_datasource_type(),
-            "datasource": self.get_datasource_id(),
-        }
-        params.update(custom_params)
-        return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py
new file mode 100644
index 0000000..dd510a8
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Database."""
+
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.database import Database
+
+TEST_DATABASE_TASK_TYPE = "SQL"
+TEST_DATABASE_SQL = "select 1"
+TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.database.Database.get_datasource_info",
+    return_value=({"id": 1, "type": "mock_type"}),
+)
+def test_get_datasource_detail(mock_datasource, mock_code_version):
+    """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
+    name = "test_get_database_detail"
+    task = Database(
+        TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL
+    )
+    assert 1 == task.get_datasource_id()
+    assert "mock_type" == task.get_datasource_type()
+
+
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {
+                "task_type": TEST_DATABASE_TASK_TYPE,
+                "name": "test-task-params",
+                "datasource_name": TEST_DATABASE_DATASOURCE_NAME,
+                "sql": TEST_DATABASE_SQL,
+            },
+            {
+                "type": "MYSQL",
+                "datasource": 1,
+                "sql": TEST_DATABASE_SQL,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.database.Database.get_datasource_info",
+    return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
+    """Test task database task property."""
+    task = Database(**attr)
+    assert expect == task.task_params
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.database.Database.get_datasource_info",
+    return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_database_get_define(mock_datasource, mock_code_version):
+    """Test task database function get_define."""
+    name = "test_database_get_define"
+    expect = {
+        "code": 123,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": TEST_DATABASE_TASK_TYPE,
+        "taskParams": {
+            "type": "MYSQL",
+            "datasource": 1,
+            "sql": TEST_DATABASE_SQL,
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    task = Database(
+        TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL
+    )
+    assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
similarity index 50%
copy from dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
copy to dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
index 2590100..42c38e1 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
@@ -15,14 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test Task Sql."""
-
+"""Test Task Procedure."""
 
 from unittest.mock import patch
 
 import pytest
 
-from pydolphinscheduler.tasks.sql import Sql, SqlType
+from pydolphinscheduler.tasks.procedure import Procedure
+
+TEST_PROCEDURE_SQL = (
+    'create procedure HelloWorld() selece "hello world"; call HelloWorld();'
+)
+TEST_PROCEDURE_DATASOURCE_NAME = "test_datasource"
 
 
 @patch(
@@ -30,71 +34,30 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+    "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
     return_value=({"id": 1, "type": "mock_type"}),
 )
 def test_get_datasource_detail(mock_datasource, mock_code_version):
     """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
-    name = "test_get_sql_type"
-    datasource_name = "test_datasource"
-    sql = "select 1"
-    task = Sql(name, datasource_name, sql)
+    name = "test_get_datasource_detail"
+    task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL)
     assert 1 == task.get_datasource_id()
     assert "mock_type" == task.get_datasource_type()
 
 
 @pytest.mark.parametrize(
-    "sql, sql_type",
-    [
-        ("select 1", SqlType.SELECT),
-        (" select 1", SqlType.SELECT),
-        (" select 1 ", SqlType.SELECT),
-        (" select 'insert' ", SqlType.SELECT),
-        (" select 'insert ' ", SqlType.SELECT),
-        ("with tmp as (select 1) select * from tmp ", SqlType.SELECT),
-        ("insert into table_name(col1, col2) value (val1, val2)", SqlType.NOT_SELECT),
-        (
-            "insert into table_name(select, col2) value ('select', val2)",
-            SqlType.NOT_SELECT,
-        ),
-        ("update table_name SET col1=val1 where col1=val2", SqlType.NOT_SELECT),
-        ("update table_name SET col1='select' where col1=val2", SqlType.NOT_SELECT),
-        ("delete from table_name where id < 10", SqlType.NOT_SELECT),
-        ("delete from table_name where id < 10", SqlType.NOT_SELECT),
-        ("alter table table_name add column col1 int", SqlType.NOT_SELECT),
-    ],
-)
-@patch(
-    "pydolphinscheduler.core.task.Task.gen_code_and_version",
-    return_value=(123, 1),
-)
-@patch(
-    "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
-    return_value=({"id": 1, "type": "mock_type"}),
-)
-def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
-    """Test property sql_type could return correct type."""
-    name = "test_get_sql_type"
-    datasource_name = "test_datasource"
-    task = Sql(name, datasource_name, sql)
-    assert (
-        sql_type == task.sql_type
-    ), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"
-
-
-@pytest.mark.parametrize(
     "attr, expect",
     [
         (
-            {"datasource_name": "datasource_name", "sql": "select 1"},
             {
-                "sql": "select 1",
+                "name": "test-procedure-task-params",
+                "datasource_name": TEST_PROCEDURE_DATASOURCE_NAME,
+                "sql": TEST_PROCEDURE_SQL,
+            },
+            {
+                "sql": TEST_PROCEDURE_SQL,
                 "type": "MYSQL",
                 "datasource": 1,
-                "sqlType": SqlType.SELECT,
-                "preStatements": [],
-                "postStatements": [],
-                "displayRows": 10,
                 "localParams": [],
                 "resourceList": [],
                 "dependence": {},
@@ -109,41 +72,37 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+    "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
     return_value=({"id": 1, "type": "MYSQL"}),
 )
 def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
     """Test task sql task property."""
-    task = Sql("test-sql-task-params", **attr)
+    task = Procedure(**attr)
     assert expect == task.task_params
 
 
 @patch(
-    "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
     return_value=({"id": 1, "type": "MYSQL"}),
 )
-def test_sql_get_define(mock_datasource):
-    """Test task sql function get_define."""
-    code = 123
-    version = 1
-    name = "test_sql_dict"
-    command = "select 1"
-    datasource_name = "test_datasource"
+def test_sql_get_define(mock_datasource, mock_code_version):
+    """Test task procedure function get_define."""
+    name = "test_procedure_get_define"
     expect = {
-        "code": code,
+        "code": 123,
         "name": name,
         "version": 1,
         "description": None,
         "delayTime": 0,
-        "taskType": "SQL",
+        "taskType": "PROCEDURE",
         "taskParams": {
             "type": "MYSQL",
             "datasource": 1,
-            "sql": command,
-            "sqlType": SqlType.SELECT,
-            "displayRows": 10,
-            "preStatements": [],
-            "postStatements": [],
+            "sql": TEST_PROCEDURE_SQL,
             "localParams": [],
             "resourceList": [],
             "dependence": {},
@@ -159,9 +118,5 @@ def test_sql_get_define(mock_datasource):
         "timeoutNotifyStrategy": None,
         "timeout": 0,
     }
-    with patch(
-        "pydolphinscheduler.core.task.Task.gen_code_and_version",
-        return_value=(code, version),
-    ):
-        task = Sql(name, datasource_name, command)
-        assert task.get_define() == expect
+    task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL)
+    assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
index 2590100..6058cce 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
@@ -126,7 +126,7 @@ def test_sql_get_define(mock_datasource):
     """Test task sql function get_define."""
     code = 123
     version = 1
-    name = "test_sql_dict"
+    name = "test_sql_get_define"
     command = "select 1"
     datasource_name = "test_datasource"
     expect = {