You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2023/07/25 00:56:42 UTC
[superset] branch db-diagnostics updated: WIP
This is an automated email from the ASF dual-hosted git repository.
beto pushed a commit to branch db-diagnostics
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/db-diagnostics by this push:
new 584adbaa70 WIP
584adbaa70 is described below
commit 584adbaa70dff548c8b2cd83632eceac40eb047e
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Mon Jul 24 17:56:32 2023 -0700
WIP
---
superset/cli/test_db.py | 172 ++++++++++++++++++++++++++++++++++++++++
superset/db_engine_specs/lib.py | 137 ++++++++++++++++++++++++++++++--
2 files changed, 301 insertions(+), 8 deletions(-)
diff --git a/superset/cli/test_db.py b/superset/cli/test_db.py
new file mode 100644
index 0000000000..1a2942ae2c
--- /dev/null
+++ b/superset/cli/test_db.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import sys
+from typing import Any, Dict, Type
+
+import click
+import yaml
+from rich.console import Console
+from sqlalchemy import create_engine
+from sqlalchemy.engine import Engine
+from sqlalchemy.engine.url import make_url
+
+from superset.db_engine_specs import load_engine_specs
+from superset.db_engine_specs.base import BaseEngineSpec
+from superset.db_engine_specs.lib import diagnose
+
+LIMIT_METHODS = {
+ "FORCE_LIMIT": "modifies the query, replacing an existing LIMIT or adding a new one",
+ "WRAP_SQL": "wraps the original query in a SELECT * with a LIMIT",
+ "FETCH_MANY": "runs the query unmodified but fetchs only LIMIT rows from the cursor",
+}
+
+
+@click.command()
+@click.argument("sqlalchemy_uri")
+@click.option(
+ "--connect-args",
+ "-c",
+ "raw_connect_args",
+ help="Connect args as JSON or YAML",
+)
+def test_db(sqlalchemy_uri: str, raw_connect_args: str | None = None) -> None:
+ """
+ Run a series of tests against an analytics database.
+
+ This command tests:
+
+ 1. The Superset DB engine spec.
+ 2. The SQLAlchemy dialect.
+ 3. The database connectivity and performance.
+
+ It's useful for people developing DB engine specs and/or SQLAlchemy dialects, and
+ also to test new DB API 2.0 drivers.
+
+ TODO:
+
+ - implement SSH tunneling
+ - implement server certificates
+
+ """
+ console = Console()
+ console.clear()
+
+ console.print("[bold]Collecting additional connection information...")
+ connect_args = collect_connection_info(console, sqlalchemy_uri, raw_connect_args)
+
+ console.print("[bold]\nChecking for a DB engine spec...")
+ test_db_engine_spec(console, sqlalchemy_uri)
+
+ console.print("[bold]\nTesting the SQLAlchemy dialect...")
+ engine = test_sqlalchemy_dialect(console, sqlalchemy_uri, connect_args)
+
+ console.print("[bold]\nTesting the database connectivity...")
+ test_database_connectivity(console, engine)
+
+
+def collect_connection_info(
+ console: Console,
+ sqlalchemy_uri: str,
+ raw_connect_args: str | None = None,
+) -> Dict[str, Any]:
+ """
+ Collect ``connect_args`` if needed.
+ """
+ console.print(f"[green]SQLAlchemy URI: [bold]{sqlalchemy_uri}")
+ if raw_connect_args is None:
+ configure_connect_args = input(
+ "> Do you want to configure connection arguments? [y/N] "
+ )
+ if configure_connect_args.strip().lower() == "y":
+ console.print(
+ "Please paste the connect_args as JSON or YAML and press CTRL-D when "
+ "finished"
+ )
+ raw_connect_args = sys.stdin.read()
+ else:
+ raw_connect_args = "{}"
+
+ return yaml.safe_load(raw_connect_args)
+
+
+def test_db_engine_spec(
+ console: Console,
+ sqlalchemy_uri: str,
+) -> Type[BaseEngineSpec] | None:
+ """
+ Test the DB engine spec, if available.
+ """
+ spec: Type[BaseEngineSpec] | None = None
+ for spec in load_engine_specs():
+ if spec.supports_url(make_url(sqlalchemy_uri)):
+ if spec.__module__.startswith("superset.db_engine_specs"):
+ console.print(
+ f":thumbs_up: [green]Found DB engine spec: [bold]{spec.engine_name}"
+ )
+ else:
+ console.print(
+ ":warning: [yellow]Found 3rd party DB engine spec: "
+ f"[bold]{spec.engine_name} ({spec.__module__})"
+ )
+ break
+ else:
+ console.print(
+ ":thumbs_down: [red]No DB engine spec found for the SQLAlchemy URI. The "
+ "database can still be used with Superset, but some functionality may be "
+ "limited."
+ )
+
+ if spec is None:
+ return None
+
+ info = diagnose(spec)
+
+ console.print("About the database:")
+ console.print(" - Method used to apply LIMIT to queries:", info["limit_method"])
+ for k, v in LIMIT_METHODS.items():
+ console.print(f" - {k}: {v}")
+ console.print(" - Supports JOINs: ", info["joins"])
+
+ console.print("Supported time grains:")
+ for k, v in info["time_grains"].items():
+ console.print(f" - {k}: {v}")
+
+
+def test_sqlalchemy_dialect(
+ console: Console,
+ sqlalchemy_uri: str,
+ connect_args: Dict[str, Any],
+) -> Engine:
+ """
+ Test the SQLAlchemy dialect, making sure it supports everything Superset needs.
+ """
+ engine = create_engine(sqlalchemy_uri, connect_args=connect_args)
+ return engine
+
+
+def test_database_connectivity(console: Console, engine: Engine) -> None:
+ with console.status("[bold green]Connecting to database..."):
+ try:
+ conn = engine.raw_connection()
+ engine.dialect.do_ping(conn) # pylint: disable=attr-defined
+ console.print(":thumbs_up: [green]Connected successfully!")
+ except Exception as ex: # pylint: disable=broad-except
+ console.print(f":thumbs_down: [red]Failed to connect: {ex}")
+ sys.exit(1)
diff --git a/superset/db_engine_specs/lib.py b/superset/db_engine_specs/lib.py
index bd9bee0222..dbe1f102e9 100644
--- a/superset/db_engine_specs/lib.py
+++ b/superset/db_engine_specs/lib.py
@@ -15,13 +15,14 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Any, Type
+from typing import Any, List, Tuple, Type
from superset.constants import TimeGrain
+from superset.db_engine_specs import load_engine_specs
from superset.db_engine_specs.base import BaseEngineSpec
-def basic_diagnostics(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
+def diagnose(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
"""
Run basic diagnostics on a given DB engine spec.
"""
@@ -44,7 +45,8 @@ def basic_diagnostics(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
output.update(
{
- "limit_method": spec.limit_method,
+ "module": spec.__module__,
+ "limit_method": spec.limit_method.upper(),
"joins": spec.allows_joins,
"subqueries": spec.allows_subqueries,
"alias_in_select": spec.allows_alias_in_select,
@@ -55,7 +57,7 @@ def basic_diagnostics(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
"order_by_in_select": spec.allows_hidden_orderby_agg,
"expression_in_orderby": spec.allows_hidden_cc_in_orderby,
"cte_in_subquery": spec.allows_cte_in_subquery,
- "limit_clause": spec.allows_limit_clause,
+ "limit_clause": spec.allow_limit_clause,
"max_column_name": spec.max_column_name_length,
"sql_comments": spec.allows_sql_comments,
"escaped_colons": spec.allows_escaped_colons,
@@ -84,21 +86,140 @@ def basic_diagnostics(spec: Type[BaseEngineSpec]) -> dict[str, Any]:
"get_metrics": "get_metrics" in spec.__dict__,
"where_latest_partition": "where_latest_partition" in spec.__dict__,
"expand_data": "expand_data" in spec.__dict__,
- "query_cost_estimation": "estimate_query_cost" in spec.__dict__,
+ "query_cost_estimation": "estimate_query_cost" in spec.__dict__
+ or "estimate_statement_cost" in spec.__dict__,
# SQL validation is implemented in external classes
"sql_validation": spec.engine in sql_validators,
},
)
+ # compute score
+ score = 0
+
+ # each time grain is 1 point
+ score += sum(output["time_grains"][time_grain.name] for time_grain in TimeGrain)
+
+ basic = ["masked_encrypted_extra", "column_type_mapping", "function_names"]
+ nice_to_have = [
+ "user_impersonation",
+ "file_upload",
+ "extra_table_metadata",
+ "dbapi_exception_mapping",
+ "custom_errors",
+ "dynamic_schema",
+ "catalog",
+ "dynamic_catalog",
+ "ssh_tunneling",
+ "query_cancelation",
+ "get_metrics",
+ "where_latest_partition",
+ ]
+ advanced = ["expand_data", "query_cost_estimation", "sql_validation"]
+ score += sum(10 * int(output[key]) for key in basic)
+ score += sum(10 * int(output[key]) for key in nice_to_have)
+ score += sum(10 * int(output[key]) for key in advanced)
+ output["score"] = score
+
return output
+def get_name(spec: Type[BaseEngineSpec]) -> str:
+ """
+ Return a name for a given DB engine spec.
+ """
+ return spec.engine_name or spec.engine
+
+
+def generate_table() -> List[Tuple[Any, ...]]:
+ """
+ Generate a table showing info for all DB engine specs.
+
+ Data is returned as a list of tuples, appropriate to write to a CSV file.
+ """
+ info = {}
+ for spec in sorted(load_engine_specs(), key=get_name):
+ info[get_name(spec)] = diagnose(spec)
+
+ rows = []
+ rows.append(tuple(info)) # header row
+ rows.append(tuple(db_info["module"] for db_info in info.values()))
+
+ # descriptive
+ keys = [
+ "limit_method",
+ "joins",
+ "subqueries",
+ "alias_in_select",
+ "alias_in_orderby",
+ "secondary_time_columns",
+ "time_groupby_inline",
+ "alias_to_source_column",
+ "order_by_in_select",
+ "expression_in_orderby",
+ "cte_in_subquery",
+ "limit_clause",
+ "max_column_name",
+ "sql_comments",
+ "escaped_colons",
+ ]
+ for key in keys:
+ rows.append(tuple(db_info[key] for db_info in info.values()))
+
+ # basic
+ for time_grain in TimeGrain:
+ rows.append(
+ tuple(db_info["time_grains"][time_grain.name] for db_info in info.values())
+ )
+ keys = [
+ "masked_encrypted_extra",
+ "column_type_mapping",
+ "function_names",
+ ]
+ for key in keys:
+ rows.append(tuple(db_info[key] for db_info in info.values()))
+
+ # nice to have
+ keys = [
+ "user_impersonation",
+ "file_upload",
+ "extra_table_metadata",
+ "dbapi_exception_mapping",
+ "custom_errors",
+ "dynamic_schema",
+ "catalog",
+ "dynamic_catalog",
+ "ssh_tunneling",
+ "query_cancelation",
+ "get_metrics",
+ "where_latest_partition",
+ ]
+ for key in keys:
+ rows.append(tuple(db_info[key] for db_info in info.values()))
+
+ # advanced
+ keys = [
+ "expand_data",
+ "query_cost_estimation",
+ "sql_validation",
+ ]
+ for key in keys:
+ rows.append(tuple(db_info[key] for db_info in info.values()))
+
+ rows.append(tuple(db_info["score"] for db_info in info.values()))
+
+ return rows
+
+
if __name__ == "__main__":
- from pprint import pprint
+ import csv
from superset.app import create_app
- from superset.db_engine_specs.shillelagh import ShillelaghEngineSpec
app = create_app()
with app.app_context():
- pprint(basic_diagnostics(ShillelaghEngineSpec))
+ rows = generate_table()
+
+ with open("db_engine_specs.csv", "w", encoding="utf-8") as fp:
+ writer = csv.writer(fp, delimiter="\t")
+ for row in rows:
+ writer.writerow(row)