You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/22 08:19:44 UTC
[dolphinscheduler-sdk-python] branch main updated: fix: sql task pre post stm typing and add example (#62)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 69d61e3 fix: sql task pre post stm typing and add example (#62)
69d61e3 is described below
commit 69d61e3567649c5eea6c862e257547d41a43ad48
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Thu Dec 22 16:19:38 2022 +0800
fix: sql task pre post stm typing and add example (#62)
Correct pre and post statement type, and also supprt
string type, and also add example of sql type
---
docs/source/tasks/sql.rst | 52 +++++++++++++++++
src/pydolphinscheduler/examples/ext/example.sql | 3 +
.../examples/task_sql_example.py | 66 ++++++++++++++++++++++
src/pydolphinscheduler/tasks/sql.py | 35 ++++++++++--
tests/tasks/test_sql.py | 24 ++++++++
tests/testing/constants.py | 2 +-
6 files changed, 176 insertions(+), 6 deletions(-)
diff --git a/docs/source/tasks/sql.rst b/docs/source/tasks/sql.rst
index 52df042..7110046 100644
--- a/docs/source/tasks/sql.rst
+++ b/docs/source/tasks/sql.rst
@@ -18,6 +18,58 @@
SQL
===
+An example for SQL task, including how to use it and the detail of it parameters.
+
+This task type can execute multiple type of database SQL, which includes
+
+- MySQL
+- PostgreSQL
+- Oracle
+- SQL Server
+- DB2
+- Hive
+- Presto
+- Trino
+- ClickHouse
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sql_example.py
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+
+You can see that SQL task support three ways to declare SQL, which are
+
+- Bare SQL: Put bare SQL statement in the ``sql`` parameter, such as ``select * from table1``.
+
+ .. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sql_example.py
+ :start-after: [start bare_sql_desc]
+ :end-before: [end bare_sql_desc]
+
+- SQL Files: .
+
+ .. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sql_example.py
+ :start-after: [start sql_file_desc]
+ :end-before: [end sql_file_desc]
+
+If you want to do some preparation before executing SQL, or do some clean up after executing SQL, you can use
+``pre_statements`` and ``post_statements`` parameters to do that. Both ``pre_statements`` and ``post_statements``
+support one or multiple statements, you can assign type sequence of SQL statements to them if you want to execute
+multiple statements. But if you only need to execute one statement, you can assign a string to them.
+
+ .. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sql_example.py
+ :start-after: [start sql_with_pre_post_desc]
+ :end-before: [end sql_with_pre_post_desc]
+
+.. note::
+
+ Parameter ``pre_statements`` and ``post_statements`` only support not query statements, such as ``create table``,
+ ``drop table``, ``update table`` currently. And also it only support bare SQL instead of SQL files now.
+
+Dive Into
+---------
+
.. automodule:: pydolphinscheduler.tasks.sql
diff --git a/src/pydolphinscheduler/examples/ext/example.sql b/src/pydolphinscheduler/examples/ext/example.sql
new file mode 100644
index 0000000..dc8348a
--- /dev/null
+++ b/src/pydolphinscheduler/examples/ext/example.sql
@@ -0,0 +1,3 @@
+select *
+from resource_plugin
+where id = 1
\ No newline at end of file
diff --git a/src/pydolphinscheduler/examples/task_sql_example.py b/src/pydolphinscheduler/examples/task_sql_example.py
new file mode 100644
index 0000000..f2a8bd4
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_sql_example.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+
+"""A example workflow for task SQL."""
+from pathlib import Path
+
+from pydolphinscheduler.core.workflow import Workflow
+from pydolphinscheduler.resources_plugin import Local
+from pydolphinscheduler.tasks.sql import Sql, SqlType
+
+with Workflow(
+ name="task_sql_example",
+) as workflow:
+ # [start bare_sql_desc]
+ bare_sql = Sql(
+ name="bare_sql",
+ datasource_name="metadata",
+ sql="select * from t_ds_version",
+ )
+ # [end bare_sql_desc]
+ # [start sql_file_desc]
+ sql_file = Sql(
+ name="sql_file",
+ datasource_name="metadata",
+ sql="ext/example.sql",
+ sql_type=SqlType.SELECT,
+ resource_plugin=Local(prefix=str(Path(__file__).parent)),
+ )
+ # [end sql_file_desc]
+ # [start sql_with_pre_post_desc]
+ sql_with_pre_post = Sql(
+ name="sql_with_pre_post",
+ datasource_name="metadata",
+ sql="select * from t_ds_version",
+ pre_statements=[
+ "update table_one set version = '1.3.6'",
+ "delete from table_two where version = '1.3.6'",
+ ],
+ post_statements="update table_one set version = '3.0.0'",
+ )
+ # [end sql_with_pre_post_desc]
+
+ bare_sql >> [
+ sql_file,
+ sql_with_pre_post,
+ ]
+
+ workflow.submit()
+
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/tasks/sql.py b/src/pydolphinscheduler/tasks/sql.py
index 4bebf83..5a6d890 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -19,7 +19,7 @@
import logging
import re
-from typing import Dict, Optional
+from typing import Dict, List, Optional, Sequence, Union
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.database import Database
@@ -49,6 +49,18 @@ class Sql(Task):
- SQLServer
You provider datasource_name contain connection information, it decisions which
database type and database instance would run this sql.
+
+ :param name: SQL task name
+ :param datasource_name: datasource name in dolphinscheduler, the name must exists and must be ``online``
+ datasource instead of ``test``.
+ :param sql: SQL statement, the sql script you want to run. Support resource plugin in this parameter.
+ :param sql_type: SQL type, whether sql statement is select query or not. If not provided, it will be auto
+ detected according to sql statement using :func:`pydolphinscheduler.tasks.sql.Sql.sql_type`, and you
+ can also set it manually. by ``SqlType.SELECT`` for query statement or ``SqlType.NOT_SELECT`` for not
+ query statement.
+ :param pre_statements: SQL statements to be executed before the main SQL statement.
+ :param post_statements: SQL statements to be executed after the main SQL statement.
+ :param display_rows: The number of record rows number to be displayed in the SQL task log, default is 10.
"""
_task_custom_attr = {
@@ -68,8 +80,8 @@ class Sql(Task):
datasource_name: str,
sql: str,
sql_type: Optional[str] = None,
- pre_statements: Optional[str] = None,
- post_statements: Optional[str] = None,
+ pre_statements: Optional[Union[str, Sequence[str]]] = None,
+ post_statements: Optional[Union[str, Sequence[str]]] = None,
display_rows: Optional[int] = 10,
*args,
**kwargs
@@ -78,10 +90,23 @@ class Sql(Task):
super().__init__(name, TaskType.SQL, *args, **kwargs)
self.param_sql_type = sql_type
self.datasource_name = datasource_name
- self.pre_statements = pre_statements or []
- self.post_statements = post_statements or []
+ self.pre_statements = self.get_stm_list(pre_statements)
+ self.post_statements = self.get_stm_list(post_statements)
self.display_rows = display_rows
+ @staticmethod
+ def get_stm_list(stm: Union[str, Sequence[str]]) -> List[str]:
+ """Convert statement to str of list.
+
+ :param stm: statements string
+ :return: statements list
+ """
+ if not stm:
+ return []
+ elif isinstance(stm, str):
+ return [stm]
+ return list(stm)
+
@property
def sql_type(self) -> str:
"""Judgement sql type, it will return the SQL type for type `SELECT` or `NOT_SELECT`.
diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py
index b9de05f..42a168d 100644
--- a/tests/tasks/test_sql.py
+++ b/tests/tasks/test_sql.py
@@ -40,6 +40,30 @@ def setup_crt_first():
delete_file(file_path)
+@pytest.mark.parametrize(
+ "stm, expected",
+ [
+ ("select * from t_ds_version", ["select * from t_ds_version"]),
+ (None, []),
+ (
+ ["select * from table1", "select * from table2"],
+ ["select * from table1", "select * from table2"],
+ ),
+ (
+ ("select * from table1", "select * from table2"),
+ ["select * from table1", "select * from table2"],
+ ),
+ (
+ {"select * from table1", "select * from table2"},
+ ["select * from table1", "select * from table2"],
+ ),
+ ],
+)
+def test_get_stm_list(stm, expected) -> None:
+ """Test static function get_stm_list."""
+ assert sorted(Sql.get_stm_list(stm)) == sorted(expected)
+
+
@pytest.mark.parametrize(
"sql, param_sql_type, sql_type",
[
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 4dfaa37..998f711 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -22,7 +22,6 @@ import os
# Record some task without example in directory `example`. Some of them maybe can not write example,
# but most of them just without adding by mistake, and we should add it later.
task_without_example = {
- "sql",
"http",
"sub_workflow",
"python",
@@ -38,6 +37,7 @@ ignore_exec_examples = {
"task_spark_example",
# TODO activate it when dolphinscheduler default resource center is local file
"multi_resources_example",
+ "task_sql_example",
}
# pydolphinscheduler environment home