You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/22 08:19:44 UTC

[dolphinscheduler-sdk-python] branch main updated: fix: sql task pre post stm typing and add example (#62)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 69d61e3  fix: sql task pre post stm typing and add example (#62)
69d61e3 is described below

commit 69d61e3567649c5eea6c862e257547d41a43ad48
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Thu Dec 22 16:19:38 2022 +0800

    fix: sql task pre post stm typing and add example (#62)
    
    Correct pre and post statement type, and also supprt
    string type, and also add example of sql type
---
 docs/source/tasks/sql.rst                          | 52 +++++++++++++++++
 src/pydolphinscheduler/examples/ext/example.sql    |  3 +
 .../examples/task_sql_example.py                   | 66 ++++++++++++++++++++++
 src/pydolphinscheduler/tasks/sql.py                | 35 ++++++++++--
 tests/tasks/test_sql.py                            | 24 ++++++++
 tests/testing/constants.py                         |  2 +-
 6 files changed, 176 insertions(+), 6 deletions(-)

diff --git a/docs/source/tasks/sql.rst b/docs/source/tasks/sql.rst
index 52df042..7110046 100644
--- a/docs/source/tasks/sql.rst
+++ b/docs/source/tasks/sql.rst
@@ -18,6 +18,58 @@
 SQL
 ===
 
+An example for SQL task, including how to use it and the detail of it parameters.
+
+This task type can execute multiple type of database SQL, which includes
+
+- MySQL
+- PostgreSQL
+- Oracle
+- SQL Server
+- DB2
+- Hive
+- Presto
+- Trino
+- ClickHouse
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sql_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+You can see that SQL task support three ways to declare SQL, which are
+
+- Bare SQL: Put bare SQL statement in the ``sql`` parameter, such as ``select * from table1``.
+
+   .. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sql_example.py
+      :start-after: [start bare_sql_desc]
+      :end-before: [end bare_sql_desc]
+
+- SQL Files: .
+
+   .. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sql_example.py
+      :start-after: [start sql_file_desc]
+      :end-before: [end sql_file_desc]
+
+If you want to do some preparation before executing SQL, or do some clean up after executing SQL, you can use 
+``pre_statements`` and ``post_statements`` parameters to do that. Both ``pre_statements`` and ``post_statements``
+support one or multiple statements, you can assign type sequence of SQL statements to them if you want to execute
+multiple statements. But if you only need to execute one statement, you can assign a string to them.
+
+   .. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sql_example.py
+      :start-after: [start sql_with_pre_post_desc]
+      :end-before: [end sql_with_pre_post_desc]
+
+.. note::
+
+   Parameter ``pre_statements`` and ``post_statements`` only support not query statements, such as ``create table``,
+   ``drop table``, ``update table`` currently. And also it only support bare SQL instead of SQL files now.
+
+Dive Into
+---------
+
 .. automodule:: pydolphinscheduler.tasks.sql
 
 
diff --git a/src/pydolphinscheduler/examples/ext/example.sql b/src/pydolphinscheduler/examples/ext/example.sql
new file mode 100644
index 0000000..dc8348a
--- /dev/null
+++ b/src/pydolphinscheduler/examples/ext/example.sql
@@ -0,0 +1,3 @@
+select *
+from resource_plugin
+where id = 1
\ No newline at end of file
diff --git a/src/pydolphinscheduler/examples/task_sql_example.py b/src/pydolphinscheduler/examples/task_sql_example.py
new file mode 100644
index 0000000..f2a8bd4
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_sql_example.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+
+"""A example workflow for task SQL."""
+from pathlib import Path
+
+from pydolphinscheduler.core.workflow import Workflow
+from pydolphinscheduler.resources_plugin import Local
+from pydolphinscheduler.tasks.sql import Sql, SqlType
+
+with Workflow(
+    name="task_sql_example",
+) as workflow:
+    # [start bare_sql_desc]
+    bare_sql = Sql(
+        name="bare_sql",
+        datasource_name="metadata",
+        sql="select * from t_ds_version",
+    )
+    # [end bare_sql_desc]
+    # [start sql_file_desc]
+    sql_file = Sql(
+        name="sql_file",
+        datasource_name="metadata",
+        sql="ext/example.sql",
+        sql_type=SqlType.SELECT,
+        resource_plugin=Local(prefix=str(Path(__file__).parent)),
+    )
+    # [end sql_file_desc]
+    # [start sql_with_pre_post_desc]
+    sql_with_pre_post = Sql(
+        name="sql_with_pre_post",
+        datasource_name="metadata",
+        sql="select * from t_ds_version",
+        pre_statements=[
+            "update table_one set version = '1.3.6'",
+            "delete from table_two where version = '1.3.6'",
+        ],
+        post_statements="update table_one set version = '3.0.0'",
+    )
+    # [end sql_with_pre_post_desc]
+
+    bare_sql >> [
+        sql_file,
+        sql_with_pre_post,
+    ]
+
+    workflow.submit()
+
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/tasks/sql.py b/src/pydolphinscheduler/tasks/sql.py
index 4bebf83..5a6d890 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -19,7 +19,7 @@
 
 import logging
 import re
-from typing import Dict, Optional
+from typing import Dict, List, Optional, Sequence, Union
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.database import Database
@@ -49,6 +49,18 @@ class Sql(Task):
     - SQLServer
     You provider datasource_name contain connection information, it decisions which
     database type and database instance would run this sql.
+
+    :param name: SQL task name
+    :param datasource_name: datasource name in dolphinscheduler, the name must exists and must be ``online``
+        datasource instead of ``test``.
+    :param sql: SQL statement, the sql script you want to run. Support resource plugin in this parameter.
+    :param sql_type: SQL type, whether sql statement is select query or not. If not provided, it will be auto
+        detected according to sql statement using :func:`pydolphinscheduler.tasks.sql.Sql.sql_type`, and you
+        can also set it manually. by ``SqlType.SELECT`` for query statement or ``SqlType.NOT_SELECT`` for not
+        query statement.
+    :param pre_statements: SQL statements to be executed before the main SQL statement.
+    :param post_statements: SQL statements to be executed after the main SQL statement.
+    :param display_rows: The number of record rows number to be displayed in the SQL task log, default is 10.
     """
 
     _task_custom_attr = {
@@ -68,8 +80,8 @@ class Sql(Task):
         datasource_name: str,
         sql: str,
         sql_type: Optional[str] = None,
-        pre_statements: Optional[str] = None,
-        post_statements: Optional[str] = None,
+        pre_statements: Optional[Union[str, Sequence[str]]] = None,
+        post_statements: Optional[Union[str, Sequence[str]]] = None,
         display_rows: Optional[int] = 10,
         *args,
         **kwargs
@@ -78,10 +90,23 @@ class Sql(Task):
         super().__init__(name, TaskType.SQL, *args, **kwargs)
         self.param_sql_type = sql_type
         self.datasource_name = datasource_name
-        self.pre_statements = pre_statements or []
-        self.post_statements = post_statements or []
+        self.pre_statements = self.get_stm_list(pre_statements)
+        self.post_statements = self.get_stm_list(post_statements)
         self.display_rows = display_rows
 
+    @staticmethod
+    def get_stm_list(stm: Union[str, Sequence[str]]) -> List[str]:
+        """Convert statement to str of list.
+
+        :param stm: statements string
+        :return: statements list
+        """
+        if not stm:
+            return []
+        elif isinstance(stm, str):
+            return [stm]
+        return list(stm)
+
     @property
     def sql_type(self) -> str:
         """Judgement sql type, it will return the SQL type for type `SELECT` or `NOT_SELECT`.
diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py
index b9de05f..42a168d 100644
--- a/tests/tasks/test_sql.py
+++ b/tests/tasks/test_sql.py
@@ -40,6 +40,30 @@ def setup_crt_first():
     delete_file(file_path)
 
 
+@pytest.mark.parametrize(
+    "stm, expected",
+    [
+        ("select * from t_ds_version", ["select * from t_ds_version"]),
+        (None, []),
+        (
+            ["select * from table1", "select * from table2"],
+            ["select * from table1", "select * from table2"],
+        ),
+        (
+            ("select * from table1", "select * from table2"),
+            ["select * from table1", "select * from table2"],
+        ),
+        (
+            {"select * from table1", "select * from table2"},
+            ["select * from table1", "select * from table2"],
+        ),
+    ],
+)
+def test_get_stm_list(stm, expected) -> None:
+    """Test static function get_stm_list."""
+    assert sorted(Sql.get_stm_list(stm)) == sorted(expected)
+
+
 @pytest.mark.parametrize(
     "sql, param_sql_type, sql_type",
     [
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 4dfaa37..998f711 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -22,7 +22,6 @@ import os
 # Record some task without example in directory `example`. Some of them maybe can not write example,
 # but most of them just without adding by mistake, and we should add it later.
 task_without_example = {
-    "sql",
     "http",
     "sub_workflow",
     "python",
@@ -38,6 +37,7 @@ ignore_exec_examples = {
     "task_spark_example",
     # TODO activate it when dolphinscheduler default resource center is local file
     "multi_resources_example",
+    "task_sql_example",
 }
 
 # pydolphinscheduler environment home