You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2023/07/25 00:56:42 UTC

[superset] branch db-diagnostics updated: WIP

This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch db-diagnostics
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/db-diagnostics by this push:
     new 584adbaa70 WIP
584adbaa70 is described below

commit 584adbaa70dff548c8b2cd83632eceac40eb047e
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Mon Jul 24 17:56:32 2023 -0700

    WIP
---
 superset/cli/test_db.py         | 172 ++++++++++++++++++++++++++++++++++++++++
 superset/db_engine_specs/lib.py | 137 ++++++++++++++++++++++++++++++--
 2 files changed, 301 insertions(+), 8 deletions(-)

diff --git a/superset/cli/test_db.py b/superset/cli/test_db.py
new file mode 100644
index 0000000000..1a2942ae2c
--- /dev/null
+++ b/superset/cli/test_db.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import sys
+from typing import Any, Dict, Type
+
+import click
+import yaml
+from rich.console import Console
+from sqlalchemy import create_engine
+from sqlalchemy.engine import Engine
+from sqlalchemy.engine.url import make_url
+
+from superset.db_engine_specs import load_engine_specs
+from superset.db_engine_specs.base import BaseEngineSpec
+from superset.db_engine_specs.lib import diagnose
+
+LIMIT_METHODS = {
+    "FORCE_LIMIT": "modifies the query, replacing an existing LIMIT or adding a new one",
+    "WRAP_SQL": "wraps the original query in a SELECT * with a LIMIT",
+    "FETCH_MANY": "runs the query unmodified but fetchs only LIMIT rows from the cursor",
+}
+
+
+@click.command()
+@click.argument("sqlalchemy_uri")
+@click.option(
+    "--connect-args",
+    "-c",
+    "raw_connect_args",
+    help="Connect args as JSON or YAML",
+)
+def test_db(sqlalchemy_uri: str, raw_connect_args: str | None = None) -> None:
+    """
+    Run a series of tests against an analytics database.
+
+    This command tests:
+
+      1. The Superset DB engine spec.
+      2. The SQLAlchemy dialect.
+      3. The database connectivity and performance.
+
+    It's useful for people developing DB engine specs and/or SQLAlchemy dialects, and
+    also to test new DB API 2.0 drivers.
+
+    TODO:
+
+      - implement SSH tunneling
+      - implement server certificates
+
+    """
+    console = Console()
+    console.clear()
+
+    console.print("[bold]Collecting additional connection information...")
+    connect_args = collect_connection_info(console, sqlalchemy_uri, raw_connect_args)
+
+    console.print("[bold]\nChecking for a DB engine spec...")
+    test_db_engine_spec(console, sqlalchemy_uri)
+
+    console.print("[bold]\nTesting the SQLAlchemy dialect...")
+    engine = test_sqlalchemy_dialect(console, sqlalchemy_uri, connect_args)
+
+    console.print("[bold]\nTesting the database connectivity...")
+    test_database_connectivity(console, engine)
+
+
+def collect_connection_info(
+    console: Console,
+    sqlalchemy_uri: str,
+    raw_connect_args: str | None = None,
+) -> Dict[str, Any]:
+    """
+    Collect ``connect_args`` if needed.
+    """
+    console.print(f"[green]SQLAlchemy URI: [bold]{sqlalchemy_uri}")
+    if raw_connect_args is None:
+        configure_connect_args = input(
+            "> Do you want to configure connection arguments? [y/N] "
+        )
+        if configure_connect_args.strip().lower() == "y":
+            console.print(
+                "Please paste the connect_args as JSON or YAML and press CTRL-D when "
+                "finished"
+            )
+            raw_connect_args = sys.stdin.read()
+        else:
+            raw_connect_args = "{}"
+
+    return yaml.safe_load(raw_connect_args)
+
+
+def test_db_engine_spec(
+    console: Console,
+    sqlalchemy_uri: str,
+) -> Type[BaseEngineSpec] | None:
+    """
+    Test the DB engine spec, if available.
+    """
+    spec: Type[BaseEngineSpec] | None = None
+    for spec in load_engine_specs():
+        if spec.supports_url(make_url(sqlalchemy_uri)):
+            if spec.__module__.startswith("superset.db_engine_specs"):
+                console.print(
+                    f":thumbs_up: [green]Found DB engine spec: [bold]{spec.engine_name}"
+                )
+            else:
+                console.print(
+                    ":warning: [yellow]Found 3rd party DB engine spec: "
+                    f"[bold]{spec.engine_name} ({spec.__module__})"
+                )
+            break
+    else:
+        console.print(
+            ":thumbs_down: [red]No DB engine spec found for the SQLAlchemy URI. The "
+            "database can still be used with Superset, but some functionality may be "
+            "limited."
+        )
+
+    if spec is None:
+        return None
+
+    info = diagnose(spec)
+
+    console.print("About the database:")
+    console.print("  - Method used to apply LIMIT to queries:", info["limit_method"])
+    for k, v in LIMIT_METHODS.items():
+        console.print(f"    - {k}: {v}")
+    console.print("  - Supports JOINs: ", info["joins"])
+
+    console.print("Supported time grains:")
+    for k, v in info["time_grains"].items():
+        console.print(f"  - {k}: {v}")
+
+
+def test_sqlalchemy_dialect(
+    console: Console,
+    sqlalchemy_uri: str,
+    connect_args: Dict[str, Any],
+) -> Engine:
+    """
+    Test the SQLAlchemy dialect, making sure it supports everything Superset needs.
+    """
+    engine = create_engine(sqlalchemy_uri, connect_args=connect_args)
+    return engine
+
+
+def test_database_connectivity(console: Console, engine: Engine) -> None:
+    with console.status("[bold green]Connecting to database..."):
+        try:
+            conn = engine.raw_connection()
+            engine.dialect.do_ping(conn)  # pylint: disable=attr-defined
+            console.print(":thumbs_up: [green]Connected successfully!")
+        except Exception as ex:  # pylint: disable=broad-except
+            console.print(f":thumbs_down: [red]Failed to connect: {ex}")
+            sys.exit(1)
diff --git a/superset/db_engine_specs/lib.py b/superset/db_engine_specs/lib.py
index bd9bee0222..dbe1f102e9 100644
--- a/superset/db_engine_specs/lib.py
+++ b/superset/db_engine_specs/lib.py
@@ -15,13 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Any, Type
+from typing import Any, List, Tuple, Type
 
 from superset.constants import TimeGrain
+from superset.db_engine_specs import load_engine_specs
 from superset.db_engine_specs.base import BaseEngineSpec
 
 
-def basic_diagnostics(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
+def diagnose(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
     """
     Run basic diagnostics on a given DB engine spec.
     """
@@ -44,7 +45,8 @@ def basic_diagnostics(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
 
     output.update(
         {
-            "limit_method": spec.limit_method,
+            "module": spec.__module__,
+            "limit_method": spec.limit_method.upper(),
             "joins": spec.allows_joins,
             "subqueries": spec.allows_subqueries,
             "alias_in_select": spec.allows_alias_in_select,
@@ -55,7 +57,7 @@ def basic_diagnostics(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
             "order_by_in_select": spec.allows_hidden_orderby_agg,
             "expression_in_orderby": spec.allows_hidden_cc_in_orderby,
             "cte_in_subquery": spec.allows_cte_in_subquery,
-            "limit_clause": spec.allows_limit_clause,
+            "limit_clause": spec.allow_limit_clause,
             "max_column_name": spec.max_column_name_length,
             "sql_comments": spec.allows_sql_comments,
             "escaped_colons": spec.allows_escaped_colons,
@@ -84,21 +86,140 @@ def basic_diagnostics(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
             "get_metrics": "get_metrics" in spec.__dict__,
             "where_latest_partition": "where_latest_partition" in spec.__dict__,
             "expand_data": "expand_data" in spec.__dict__,
-            "query_cost_estimation": "estimate_query_cost" in spec.__dict__,
+            "query_cost_estimation": "estimate_query_cost" in spec.__dict__
+            or "estimate_statement_cost" in spec.__dict__,
             # SQL validation is implemented in external classes
             "sql_validation": spec.engine in sql_validators,
         },
     )
 
+    # compute score
+    score = 0
+
+    # each time grain is 1 point
+    score += sum(output["time_grains"][time_grain.name] for time_grain in TimeGrain)
+
+    basic = ["masked_encrypted_extra", "column_type_mapping", "function_names"]
+    nice_to_have = [
+        "user_impersonation",
+        "file_upload",
+        "extra_table_metadata",
+        "dbapi_exception_mapping",
+        "custom_errors",
+        "dynamic_schema",
+        "catalog",
+        "dynamic_catalog",
+        "ssh_tunneling",
+        "query_cancelation",
+        "get_metrics",
+        "where_latest_partition",
+    ]
+    advanced = ["expand_data", "query_cost_estimation", "sql_validation"]
+    score += sum(10 * int(output[key]) for key in basic)
+    score += sum(10 * int(output[key]) for key in nice_to_have)
+    score += sum(10 * int(output[key]) for key in advanced)
+    output["score"] = score
+
     return output
 
 
+def get_name(spec: Type[BaseEngineSpec]) -> str:
+    """
+    Return a name for a given DB engine spec.
+    """
+    return spec.engine_name or spec.engine
+
+
+def generate_table() -> List[Tuple[Any, ...]]:
+    """
+    Generate a table showing info for all DB engine specs.
+
+    Data is returned as a list of tuples, appropriate to write to a CSV file.
+    """
+    info = {}
+    for spec in sorted(load_engine_specs(), key=get_name):
+        info[get_name(spec)] = diagnose(spec)
+
+    rows = []
+    rows.append(tuple(info))  # header row
+    rows.append(tuple(db_info["module"] for db_info in info.values()))
+
+    # descriptive
+    keys = [
+        "limit_method",
+        "joins",
+        "subqueries",
+        "alias_in_select",
+        "alias_in_orderby",
+        "secondary_time_columns",
+        "time_groupby_inline",
+        "alias_to_source_column",
+        "order_by_in_select",
+        "expression_in_orderby",
+        "cte_in_subquery",
+        "limit_clause",
+        "max_column_name",
+        "sql_comments",
+        "escaped_colons",
+    ]
+    for key in keys:
+        rows.append(tuple(db_info[key] for db_info in info.values()))
+
+    # basic
+    for time_grain in TimeGrain:
+        rows.append(
+            tuple(db_info["time_grains"][time_grain.name] for db_info in info.values())
+        )
+    keys = [
+        "masked_encrypted_extra",
+        "column_type_mapping",
+        "function_names",
+    ]
+    for key in keys:
+        rows.append(tuple(db_info[key] for db_info in info.values()))
+
+    # nice to have
+    keys = [
+        "user_impersonation",
+        "file_upload",
+        "extra_table_metadata",
+        "dbapi_exception_mapping",
+        "custom_errors",
+        "dynamic_schema",
+        "catalog",
+        "dynamic_catalog",
+        "ssh_tunneling",
+        "query_cancelation",
+        "get_metrics",
+        "where_latest_partition",
+    ]
+    for key in keys:
+        rows.append(tuple(db_info[key] for db_info in info.values()))
+
+    # advanced
+    keys = [
+        "expand_data",
+        "query_cost_estimation",
+        "sql_validation",
+    ]
+    for key in keys:
+        rows.append(tuple(db_info[key] for db_info in info.values()))
+
+    rows.append(tuple(db_info["score"] for db_info in info.values()))
+
+    return rows
+
+
 if __name__ == "__main__":
-    from pprint import pprint
+    import csv
 
     from superset.app import create_app
-    from superset.db_engine_specs.shillelagh import ShillelaghEngineSpec
 
     app = create_app()
     with app.app_context():
-        pprint(basic_diagnostics(ShillelaghEngineSpec))
+        rows = generate_table()
+
+    with open("db_engine_specs.csv", "w", encoding="utf-8") as fp:
+        writer = csv.writer(fp, delimiter="\t")
+        for row in rows:
+            writer.writerow(row)