You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2023/08/18 19:21:42 UTC

[superset] branch master updated: feat: a native SQLAlchemy dialect for Superset (#14225)

This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b660c86a4 feat: a native SQLAlchemy dialect for Superset (#14225)
6b660c86a4 is described below

commit 6b660c86a4c3b968dfe297443975e0686f05d9da
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Fri Aug 18 12:21:36 2023 -0700

    feat: a native SQLAlchemy dialect for Superset (#14225)
---
 docs/docs/databases/meta-database.mdx          |  48 +++
 requirements/base.txt                          |  27 +-
 requirements/development.txt                   |   4 -
 setup.cfg                                      |   2 +-
 setup.py                                       |   9 +-
 superset/config.py                             |   9 +
 superset/db_engine_specs/__init__.py           |   5 +-
 superset/db_engine_specs/superset.py           |  39 +++
 superset/extensions/metadb.py                  | 447 +++++++++++++++++++++++++
 superset/security/analytics_db_safety.py       |   4 +
 tests/unit_tests/extensions/test_sqlalchemy.py | 250 ++++++++++++++
 11 files changed, 833 insertions(+), 11 deletions(-)

diff --git a/docs/docs/databases/meta-database.mdx b/docs/docs/databases/meta-database.mdx
new file mode 100644
index 0000000000..ae5b055470
--- /dev/null
+++ b/docs/docs/databases/meta-database.mdx
@@ -0,0 +1,48 @@
+---
+title: Querying across databases
+hide_title: true
+sidebar_position: 42
+version: 1
+---
+
+## Querying across databases
+
+Superset offers an experimental feature for querying across different databases. This is done via a special database called "Superset meta database" that uses the "superset://" SQLAlchemy URI. When using the database it's possible to query any table in any of the configured databases using the following syntax:
+
+```sql
+SELECT * FROM "database name.[[catalog.].schema].table name";
+```
+
+For example:
+
+```sql
+SELECT * FROM "examples.birth_names";
+```
+
+Spaces are allowed, but periods in the names must be replaced by `%2E`. Eg:
+
+```sql
+SELECT * FROM "Superset meta database.examples%2Ebirth_names";
+```
+
+The query above returns the same rows as `SELECT * FROM "examples.birth_names"`, and also shows that the meta database can query tables from any table — even itself!
+
+## Considerations
+
+Before enabling this feature, there are a few considerations that you should have in mind. First, the meta database enforces permissions on the queried tables, so users should only have access via the database to tables that they originally have access to. Nevertheless, the meta database is a new surface for potential attacks, and bugs could allow users to see data they should not.
+
+Second, there are performance considerations. The meta database will push any filtering, sorting, and limiting to the underlying databases, but any aggregations and joins will happen in memory in the process running the query. Because of this, it's recommended to run the database in async mode, so queries are executed in Celery workers, instead of the web workers. Additionally, it's possible to specify a hard limit on how many rows are returned from the underlying databases.
+
+## Enabling the meta database
+
+To enable the Superset meta database, first you need to set the `ENABLE_SUPERSET_META_DB` feature flag to true. Then, add a new database of type "Superset meta database" with the SQLAlchemy URI "superset://".
+
+If you enable DML in the meta database users will be able to run DML queries on underlying databases **as long as DML is also enabled in them**. This allows users to run queries that move data across databases.
+
+Second, you might want to change the value of `SUPERSET_META_DB_LIMIT`. The default value is 1000, and defines how many are read from each database before any aggregations and joins are executed. You can also set this value `None` if you only have small tables.
+
+Additionally, you might want to restrict the databases to with the meta database has access to. This can be done in the database configuration, under "Advanced" -> "Other" -> "ENGINE PARAMETERS" and adding:
+
+```json
+{"allowed_dbs":["Google Sheets","examples"]}
+```
diff --git a/requirements/base.txt b/requirements/base.txt
index 68cd81ba24..2389071f46 100644
--- a/requirements/base.txt
+++ b/requirements/base.txt
@@ -13,6 +13,8 @@ amqp==5.1.0
     # via kombu
 apispec[yaml]==6.3.0
     # via flask-appbuilder
+apsw==3.42.0.1
+    # via shillelagh
 async-timeout==4.0.2
     # via redis
 attrs==23.1.0
@@ -33,10 +35,14 @@ cachelib==0.4.1
     # via apache-superset
 celery==5.2.2
     # via apache-superset
+certifi==2023.7.22
+    # via requests
 cffi==1.15.1
     # via
     #   cryptography
     #   pynacl
+charset-normalizer==3.2.0
+    # via requests
 click==8.1.3
     # via
     #   apache-superset
@@ -125,7 +131,9 @@ geographiclib==1.52
 geopy==2.2.0
     # via apache-superset
 greenlet==2.0.2
-    # via sqlalchemy
+    # via
+    #   shillelagh
+    #   sqlalchemy
 gunicorn==20.1.0
     # via apache-superset
 hashids==1.3.1
@@ -137,11 +145,14 @@ holidays==0.23
 humanize==3.11.0
     # via apache-superset
 idna==3.2
-    # via email-validator
+    # via
+    #   email-validator
+    #   requests
 importlib-metadata==6.6.0
     # via
     #   apache-superset
     #   flask
+    #   shillelagh
 importlib-resources==5.12.0
     # via limits
 isodate==0.6.0
@@ -211,6 +222,7 @@ packaging==23.1
     #   deprecation
     #   limits
     #   marshmallow
+    #   shillelagh
 pandas[performance]==2.0.3
     # via apache-superset
 paramiko==2.11.0
@@ -252,6 +264,7 @@ python-dateutil==2.8.2
     #   flask-appbuilder
     #   holidays
     #   pandas
+    #   shillelagh
 python-dotenv==0.19.0
     # via apache-superset
 python-editor==1.0.4
@@ -270,10 +283,14 @@ pyyaml==6.0.1
     #   apispec
 redis==4.5.4
     # via apache-superset
+requests==2.31.0
+    # via shillelagh
 rich==13.3.4
     # via flask-limiter
 selenium==3.141.0
     # via apache-superset
+shillelagh==1.2.6
+    # via apache-superset
 shortid==0.1.2
     # via apache-superset
 simplejson==3.17.3
@@ -295,6 +312,7 @@ sqlalchemy==1.4.36
     #   flask-appbuilder
     #   flask-sqlalchemy
     #   marshmallow-sqlalchemy
+    #   shillelagh
     #   sqlalchemy-utils
 sqlalchemy-utils==0.38.3
     # via
@@ -311,10 +329,13 @@ typing-extensions==4.4.0
     #   apache-superset
     #   flask-limiter
     #   limits
+    #   shillelagh
 tzdata==2023.3
     # via pandas
 urllib3==1.26.6
-    # via selenium
+    # via
+    #   requests
+    #   selenium
 vine==5.0.0
     # via
     #   amqp
diff --git a/requirements/development.txt b/requirements/development.txt
index 7155b2e1a5..cab908f5e7 100644
--- a/requirements/development.txt
+++ b/requirements/development.txt
@@ -26,12 +26,8 @@ botocore==1.29.130
     #   s3transfer
 cached-property==1.5.2
     # via tableschema
-certifi==2023.5.7
-    # via requests
 chardet==5.1.0
     # via tabulator
-charset-normalizer==3.1.0
-    # via requests
 decorator==5.1.1
     # via ipython
 dill==0.3.6
diff --git a/setup.cfg b/setup.cfg
index bc704c5b6c..4340cf50c4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -30,7 +30,7 @@ combine_as_imports = true
 include_trailing_comma = true
 line_length = 88
 known_first_party = superset
-known_third_party =alembic,apispec,backoff,cachelib,celery,click,colorama,cron_descriptor,croniter,cryptography,dateutil,deprecation,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_jwt_extended,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,freezegun,geohash,geopy,holidays,humanize,isodate,jinja2,jwt,markdown,markupsafe,marshmallow,msgpack,nh3,numpy,pandas,parameterized,parsedatetime,pgsanity,pkg_resources,polyline,prison,progress, [...]
+known_third_party =alembic,apispec,backoff,cachelib,celery,click,colorama,cron_descriptor,croniter,cryptography,dateutil,deprecation,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_jwt_extended,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,freezegun,geohash,geopy,holidays,humanize,isodate,jinja2,jwt,markdown,markupsafe,marshmallow,msgpack,nh3,numpy,pandas,parameterized,parsedatetime,pgsanity,pkg_resources,polyline,prison,progress, [...]
 multi_line_output = 3
 order_by_type = false
 
diff --git a/setup.py b/setup.py
index d6eb1d68a6..d26253e164 100644
--- a/setup.py
+++ b/setup.py
@@ -67,6 +67,10 @@ setup(
         "sqlalchemy.dialects": [
             "postgres.psycopg2 = sqlalchemy.dialects.postgresql:dialect",
             "postgres = sqlalchemy.dialects.postgresql:dialect",
+            "superset = superset.extensions.metadb:SupersetAPSWDialect",
+        ],
+        "shillelagh.adapter": [
+            "superset=superset.extensions.metadb:SupersetShillelaghAdapter"
         ],
     },
     install_requires=[
@@ -115,6 +119,7 @@ setup(
         "PyJWT>=2.4.0, <3.0",
         "redis>=4.5.4, <5.0",
         "selenium>=3.141.0, <4.10.0",
+        "shillelagh>=1.2.6,<2.0",
         "shortid",
         "sshtunnel>=0.4.0, <0.5",
         "simplejson>=3.15.0",
@@ -158,7 +163,7 @@ setup(
         "excel": ["xlrd>=1.2.0, <1.3"],
         "firebird": ["sqlalchemy-firebird>=0.7.0, <0.8"],
         "firebolt": ["firebolt-sqlalchemy>=0.0.1"],
-        "gsheets": ["shillelagh[gsheetsapi]>=1.0.14, <2"],
+        "gsheets": ["shillelagh[gsheetsapi]>=1.2.6, <2"],
         "hana": ["hdbcli==2.4.162", "sqlalchemy_hana==0.4.0"],
         "hive": ["pyhive[hive]>=0.6.5", "tableschema", "thrift>=0.14.1, <1.0.0"],
         "impala": ["impyla>0.16.2, <0.17"],
@@ -181,7 +186,7 @@ setup(
         "redshift": ["sqlalchemy-redshift>=0.8.1, < 0.9"],
         "rockset": ["rockset-sqlalchemy>=0.0.1, <1.0.0"],
         "shillelagh": [
-            "shillelagh[datasetteapi,gsheetsapi,socrata,weatherapi]>=1.1.1, <2"
+            "shillelagh[datasetteapi,gsheetsapi,socrata,weatherapi]>=1.2.6,<2"
         ],
         "snowflake": ["snowflake-sqlalchemy>=1.2.4, <2"],
         "spark": ["pyhive[hive]>=0.6.5", "tableschema", "thrift>=0.14.1, <1.0.0"],
diff --git a/superset/config.py b/superset/config.py
index 21f275e42d..62c367c883 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -494,6 +494,12 @@ DEFAULT_FEATURE_FLAGS: dict[str, bool] = {
     # or to disallow users from viewing other users profile page
     # Do not show user info or profile in the menu
     "MENU_HIDE_USER_INFO": False,
+    # Allows users to add a ``superset://`` DB that can query across databases. This is
+    # an experimental feature with potential security and performance risks, so use with
+    # caution. If the feature is enabled you can also set a limit for how much data is
+    # returned from each database in the ``SUPERSET_META_DB_LIMIT`` configuration value
+    # in this file.
+    "ENABLE_SUPERSET_META_DB": False,
 }
 
 # ------------------------------
@@ -885,6 +891,9 @@ DISPLAY_MAX_ROW = 10000
 # the SQL Lab UI
 DEFAULT_SQLLAB_LIMIT = 1000
 
+# The limit for the Superset Meta DB when the feature flag ENABLE_SUPERSET_META_DB is on
+SUPERSET_META_DB_LIMIT: int | None = 1000
+
 # Adds a warning message on sqllab save query and schedule query modals.
 SQLLAB_SAVE_WARNING_MESSAGE = None
 SQLLAB_SCHEDULE_WARNING_MESSAGE = None
diff --git a/superset/db_engine_specs/__init__.py b/superset/db_engine_specs/__init__.py
index 16f44ade7a..d4ec199133 100644
--- a/superset/db_engine_specs/__init__.py
+++ b/superset/db_engine_specs/__init__.py
@@ -41,7 +41,7 @@ from importlib_metadata import entry_points
 from sqlalchemy.engine.default import DefaultDialect
 from sqlalchemy.engine.url import URL
 
-from superset import app
+from superset import app, feature_flag_manager
 from superset.db_engine_specs.base import BaseEngineSpec
 
 logger = logging.getLogger(__name__)
@@ -120,6 +120,7 @@ backend_replacements = {
 }
 
 
+# pylint: disable=too-many-branches
 def get_available_engine_specs() -> dict[type[BaseEngineSpec], set[str]]:
     """
     Return available engine specs and installed drivers for them.
@@ -172,6 +173,8 @@ def get_available_engine_specs() -> dict[type[BaseEngineSpec], set[str]]:
 
         # do not add denied db engine specs to available list
         dbs_denylist = app.config["DBS_AVAILABLE_DENYLIST"]
+        if not feature_flag_manager.is_feature_enabled("ENABLE_SUPERSET_META_DB"):
+            dbs_denylist["superset"] = {""}
         dbs_denylist_engines = dbs_denylist.keys()
 
         if (
diff --git a/superset/db_engine_specs/superset.py b/superset/db_engine_specs/superset.py
new file mode 100644
index 0000000000..318af04d13
--- /dev/null
+++ b/superset/db_engine_specs/superset.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+A native Superset database.
+"""
+
+from superset.db_engine_specs.shillelagh import ShillelaghEngineSpec
+
+
+class SupersetEngineSpec(ShillelaghEngineSpec):
+    """
+    Internal engine for Superset
+
+    This DB engine spec is a meta-database. It uses the shillelagh library
+    to build a DB that can operate across different Superset databases.
+    """
+
+    engine = "superset"
+    engine_name = "Superset meta database"
+    drivers = {"": "Native driver"}
+    default_driver = ""
+    sqlalchemy_uri_placeholder = "superset://"
+
+    supports_file_upload = False
diff --git a/superset/extensions/metadb.py b/superset/extensions/metadb.py
new file mode 100644
index 0000000000..79a3c446c4
--- /dev/null
+++ b/superset/extensions/metadb.py
@@ -0,0 +1,447 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+A SQLAlchemy dialect for querying across Superset databases.
+
+The dialect ``superset://`` allows users to query any table in any database that has been
+configured in Superset, eg:
+
+    > SELECT * FROM "examples.birth_names";
+
+The syntax for tables is:
+
+    database[[.catalog].schema].table
+
+The dialect is built on top of Shillelagh, a framework for building DB API 2.0 libraries
+and SQLAlchemy dialects based on SQLite. SQLite will parse the SQL, and pass the filters
+to the adapter. The adapter builds a SQLAlchemy query object reading data from the table
+and applying any filters (as well as sorting, limiting, and offsetting).
+
+Note that no aggregation is done on the database. Aggregations and other operations like
+joins and unions are done in memory, using the SQLite engine.
+"""
+
+from __future__ import annotations
+
+import datetime
+import operator
+import urllib.parse
+from collections.abc import Iterator
+from functools import partial, wraps
+from typing import Any, Callable, cast, TypeVar
+
+from flask import current_app
+from shillelagh.adapters.base import Adapter
+from shillelagh.backends.apsw.dialects.base import APSWDialect
+from shillelagh.exceptions import ProgrammingError
+from shillelagh.fields import (
+    Blob,
+    Boolean,
+    Date,
+    DateTime,
+    Field,
+    Float,
+    Integer,
+    Order,
+    String,
+    Time,
+)
+from shillelagh.filters import Equal, Filter, Range
+from shillelagh.typing import RequestedOrder, Row
+from sqlalchemy import MetaData, Table
+from sqlalchemy.engine.url import URL
+from sqlalchemy.exc import NoSuchTableError
+from sqlalchemy.sql import Select, select
+
+from superset import db, feature_flag_manager, security_manager, sql_parse
+
+
+# pylint: disable=abstract-method
+class SupersetAPSWDialect(APSWDialect):
+
+    """
+    A SQLAlchemy dialect for an internal Superset engine.
+
+    This dialect allows query to be executed across different Superset
+    databases. For example, to read data from the `birth_names` table in the
+    `examples` databases:
+
+        >>> engine = create_engine('superset://')
+        >>> conn = engine.connect()
+        >>> results = conn.execute('SELECT * FROM "examples.birth_names"')
+
+    Queries can also join data across different Superset databases.
+
+    The dialect is built in top of the shillelagh library, leveraging SQLite to
+    create virtual tables on-the-fly proxying Superset tables. The
+    `SupersetShillelaghAdapter` adapter is responsible for returning data when a
+    Superset table is accessed.
+    """
+
+    name = "superset"
+
+    def __init__(self, allowed_dbs: list[str] | None = None, **kwargs: Any) -> None:
+        super().__init__(**kwargs)
+
+        self.allowed_dbs = allowed_dbs
+
+    def create_connect_args(self, url: URL) -> tuple[tuple[()], dict[str, Any]]:
+        """
+        A custom Shillelagh SQLAlchemy dialect with a single adapter configured.
+        """
+        return (
+            (),
+            {
+                "path": ":memory:",
+                "adapters": ["superset"],
+                "adapter_kwargs": {
+                    "superset": {
+                        "prefix": None,
+                        "allowed_dbs": self.allowed_dbs,
+                    }
+                },
+                "safe": True,
+                "isolation_level": self.isolation_level,
+            },
+        )
+
+
+F = TypeVar("F", bound=Callable[..., Any])
+
+
+def check_dml(method: F) -> F:
+    """
+    Decorator that prevents DML against databases where it's not allowed.
+    """
+
+    @wraps(method)
+    def wrapper(self: SupersetShillelaghAdapter, *args: Any, **kwargs: Any) -> Any:
+        # pylint: disable=protected-access
+        if not self._allow_dml:
+            raise ProgrammingError(f'DML not enabled in database "{self.database}"')
+        return method(self, *args, **kwargs)
+
+    return cast(F, wrapper)
+
+
+def has_rowid(method: F) -> F:
+    """
+    Decorator that prevents updates/deletes on tables without a rowid.
+    """
+
+    @wraps(method)
+    def wrapper(self: SupersetShillelaghAdapter, *args: Any, **kwargs: Any) -> Any:
+        # pylint: disable=protected-access
+        if not self._rowid:
+            raise ProgrammingError(
+                "Can only modify data in a table with a single, integer, primary key"
+            )
+        return method(self, *args, **kwargs)
+
+    return cast(F, wrapper)
+
+
+# pylint: disable=too-many-instance-attributes
+class SupersetShillelaghAdapter(Adapter):
+
+    """
+    A shillelagh adapter for Superset tables.
+
+    Shillelagh adapters are responsible for fetching data from a given resource,
+    allowing it to be represented as a virtual table in SQLite. This one works
+    as a proxy to Superset tables.
+    """
+
+    # no access to the filesystem
+    safe = True
+
+    supports_limit = True
+    supports_offset = True
+
+    type_map: dict[Any, type[Field]] = {
+        bool: Boolean,
+        float: Float,
+        int: Integer,
+        str: String,
+        datetime.date: Date,
+        datetime.datetime: DateTime,
+        datetime.time: Time,
+    }
+
+    @staticmethod
+    def supports(
+        uri: str,
+        fast: bool = True,
+        prefix: str | None = "superset",
+        allowed_dbs: list[str] | None = None,
+        **kwargs: Any,
+    ) -> bool:
+        """
+        Return if a table is supported by the adapter.
+
+        An URL for a table has the format [prefix.]database[[.catalog].schema].table,
+        eg, `superset.examples.birth_names`.
+
+        When using the Superset SQLAlchemy and DB engine spec the prefix is dropped, so
+        that tables should have the format database[[.catalog].schema].table.
+        """
+        parts = [urllib.parse.unquote(part) for part in uri.split(".")]
+
+        if prefix is not None:
+            if parts.pop(0) != prefix:
+                return False
+
+        if allowed_dbs is not None and parts[0] not in allowed_dbs:
+            return False
+
+        return 2 <= len(parts) <= 4
+
+    @staticmethod
+    def parse_uri(uri: str) -> tuple[str]:
+        """
+        Pass URI through unmodified.
+        """
+        return (uri,)
+
+    def __init__(
+        self,
+        uri: str,
+        prefix: str | None = "superset",
+        **kwargs: Any,
+    ):
+        if not feature_flag_manager.is_feature_enabled("ENABLE_SUPERSET_META_DB"):
+            raise ProgrammingError("Superset meta database is disabled")
+
+        super().__init__(**kwargs)
+
+        parts = [urllib.parse.unquote(part) for part in uri.split(".")]
+
+        if prefix is not None:
+            if prefix != parts.pop(0):
+                raise ProgrammingError("Invalid prefix")
+            self.prefix = prefix
+
+        self.database = parts.pop(0)
+        self.table = parts.pop(-1)
+        self.schema = parts.pop(-1) if parts else None
+        self.catalog = parts.pop(-1) if parts else None
+
+        if self.catalog:
+            raise NotImplementedError("Catalogs are not currently supported")
+
+        # If the table has a single integer primary key we use that as the row ID in order
+        # to perform updates and deletes. Otherwise we can only do inserts and selects.
+        self._rowid: str | None = None
+
+        # Does the database allow DML?
+        self._allow_dml: bool = False
+
+        # Read column information from the database, and store it for later.
+        self._set_columns()
+
+    @classmethod
+    def get_field(cls, python_type: Any) -> Field:
+        """
+        Convert a Python type into a Shillelagh field.
+        """
+        class_ = cls.type_map.get(python_type, Blob)
+        return class_(filters=[Equal, Range], order=Order.ANY, exact=True)
+
+    def _set_columns(self) -> None:
+        """
+        Inspect the table and get its columns.
+
+        This is done on initialization because it's expensive.
+        """
+        # pylint: disable=import-outside-toplevel
+        from superset.models.core import Database
+
+        database = (
+            db.session.query(Database).filter_by(database_name=self.database).first()
+        )
+        if database is None:
+            raise ProgrammingError(f"Database not found: {self.database}")
+        self._allow_dml = database.allow_dml
+
+        # verify permissions
+        table = sql_parse.Table(self.table, self.schema, self.catalog)
+        security_manager.raise_for_access(database=database, table=table)
+
+        # store this callable for later whenever we need an engine
+        self.engine_context = partial(
+            database.get_sqla_engine_with_context,
+            self.schema,
+        )
+
+        # fetch column names and types
+        metadata = MetaData()
+        with self.engine_context() as engine:
+            try:
+                self._table = Table(
+                    self.table,
+                    metadata,
+                    schema=self.schema,
+                    autoload=True,
+                    autoload_with=engine,
+                )
+            except NoSuchTableError as ex:
+                raise ProgrammingError(f"Table does not exist: {self.table}") from ex
+
+        # find row ID column; we can only update/delete data into a table with a
+        # single integer primary key
+        primary_keys = [
+            column for column in list(self._table.primary_key) if column.primary_key
+        ]
+        if len(primary_keys) == 1 and primary_keys[0].type.python_type == int:
+            self._rowid = primary_keys[0].name
+
+        self.columns = {
+            column.name: self.get_field(column.type.python_type)
+            for column in self._table.c
+        }
+
+    def get_columns(self) -> dict[str, Field]:
+        """
+        Return table columns.
+        """
+        return self.columns
+
+    def _build_sql(
+        self,
+        bounds: dict[str, Filter],
+        order: list[tuple[str, RequestedOrder]],
+        limit: int | None = None,
+        offset: int | None = None,
+    ) -> Select:
+        """
+        Build SQLAlchemy query object.
+        """
+        query = select([self._table])
+
+        for column_name, filter_ in bounds.items():
+            column = self._table.c[column_name]
+            if isinstance(filter_, Equal):
+                query = query.where(column == filter_.value)
+            elif isinstance(filter_, Range):
+                if filter_.start is not None:
+                    op = operator.ge if filter_.include_start else operator.gt
+                    query = query.where(op(column, filter_.start))
+                if filter_.end is not None:
+                    op = operator.le if filter_.include_end else operator.lt
+                    query = query.where(op(column, filter_.end))
+            else:
+                raise ProgrammingError(f"Invalid filter: {filter_}")
+
+        for column_name, requested_order in order:
+            column = self._table.c[column_name]
+            if requested_order == Order.DESCENDING:
+                column = column.desc()
+            query = query.order_by(column)
+
+        if limit is not None:
+            query = query.limit(limit)
+        if offset is not None:
+            query = query.offset(offset)
+
+        return query
+
+    def get_data(
+        self,
+        bounds: dict[str, Filter],
+        order: list[tuple[str, RequestedOrder]],
+        limit: int | None = None,
+        offset: int | None = None,
+        **kwargs: Any,
+    ) -> Iterator[Row]:
+        """
+        Return data for a `SELECT` statement.
+        """
+        app_limit: int | None = current_app.config["SUPERSET_META_DB_LIMIT"]
+        if limit is None:
+            limit = app_limit
+        elif app_limit is not None:
+            limit = min(limit, app_limit)
+
+        query = self._build_sql(bounds, order, limit, offset)
+
+        with self.engine_context() as engine:
+            connection = engine.connect()
+            rows = connection.execute(query)
+            for i, row in enumerate(rows):
+                data = dict(zip(self.columns, row))
+                data["rowid"] = data[self._rowid] if self._rowid else i
+                yield data
+
+    @check_dml
+    def insert_row(self, row: Row) -> int:
+        """
+        Insert a single row.
+        """
+        row_id: int | None = row.pop("rowid")
+        if row_id and self._rowid:
+            if row.get(self._rowid) != row_id:
+                raise ProgrammingError(f"Invalid rowid specified: {row_id}")
+            row[self._rowid] = row_id
+
+        query = self._table.insert().values(**row)
+
+        with self.engine_context() as engine:
+            connection = engine.connect()
+            result = connection.execute(query)
+
+            # return rowid
+            if self._rowid:
+                return result.inserted_primary_key[0]
+
+            query = self._table.count()
+            return connection.execute(query).scalar()
+
+    @check_dml
+    @has_rowid
+    def delete_row(self, row_id: int) -> None:
+        """
+        Delete a single row given its row ID.
+        """
+        query = self._table.delete().where(self._table.c[self._rowid] == row_id)
+
+        with self.engine_context() as engine:
+            connection = engine.connect()
+            connection.execute(query)
+
+    @check_dml
+    @has_rowid
+    def update_row(self, row_id: int, row: Row) -> None:
+        """
+        Update a single row given its row ID.
+
+        Note that the updated row might have a new row ID.
+        """
+        new_row_id: int | None = row.pop("rowid")
+        if new_row_id:
+            if row.get(self._rowid) != new_row_id:
+                raise ProgrammingError(f"Invalid rowid specified: {new_row_id}")
+            row[self._rowid] = new_row_id
+
+        query = (
+            self._table.update()
+            .where(self._table.c[self._rowid] == row_id)
+            .values(**row)
+        )
+
+        with self.engine_context() as engine:
+            connection = engine.connect()
+            connection.execute(query)
diff --git a/superset/security/analytics_db_safety.py b/superset/security/analytics_db_safety.py
index 29583b5255..73db564c60 100644
--- a/superset/security/analytics_db_safety.py
+++ b/superset/security/analytics_db_safety.py
@@ -20,6 +20,7 @@ from flask_babel import lazy_gettext as _
 from sqlalchemy.engine.url import URL
 from sqlalchemy.exc import NoSuchModuleError
 
+from superset import feature_flag_manager
 from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
 from superset.exceptions import SupersetSecurityException
 
@@ -34,6 +35,9 @@ BLOCKLIST = {
 
 
 def check_sqlalchemy_uri(uri: URL) -> None:
+    if not feature_flag_manager.is_feature_enabled("ENABLE_SUPERSET_META_DB"):
+        BLOCKLIST.add(re.compile(r"superset$"))
+
     for blocklist_regex in BLOCKLIST:
         if not re.match(blocklist_regex, uri.drivername):
             continue
diff --git a/tests/unit_tests/extensions/test_sqlalchemy.py b/tests/unit_tests/extensions/test_sqlalchemy.py
new file mode 100644
index 0000000000..cc738fd6c6
--- /dev/null
+++ b/tests/unit_tests/extensions/test_sqlalchemy.py
@@ -0,0 +1,250 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=redefined-outer-name, import-outside-toplevel, unused-argument
+
+import os
+from collections.abc import Iterator
+from typing import TYPE_CHECKING
+
+import pytest
+from pytest_mock import MockFixture
+from sqlalchemy.engine import create_engine
+from sqlalchemy.exc import ProgrammingError
+from sqlalchemy.orm.session import Session
+
+from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import SupersetSecurityException
+from tests.unit_tests.conftest import with_feature_flags
+
+if TYPE_CHECKING:
+    from superset.models.core import Database
+
+
+@pytest.fixture
+def database1(session: Session) -> Iterator["Database"]:
+    from superset.models.core import Database
+
+    engine = session.connection().engine
+    Database.metadata.create_all(engine)  # pylint: disable=no-member
+
+    database = Database(
+        database_name="database1",
+        sqlalchemy_uri="sqlite:///database1.db",
+        allow_dml=True,
+    )
+    session.add(database)
+    session.commit()
+
+    yield database
+
+    session.delete(database)
+    session.commit()
+    os.unlink("database1.db")
+
+
+@pytest.fixture
+def table1(session: Session, database1: "Database") -> Iterator[None]:
+    with database1.get_sqla_engine_with_context() as engine:
+        conn = engine.connect()
+        conn.execute("CREATE TABLE table1 (a INTEGER NOT NULL PRIMARY KEY, b INTEGER)")
+        conn.execute("INSERT INTO table1 (a, b) VALUES (1, 10), (2, 20)")
+        session.commit()
+
+        yield
+
+        conn.execute("DROP TABLE table1")
+        session.commit()
+
+
+@pytest.fixture
+def database2(session: Session) -> Iterator["Database"]:
+    from superset.models.core import Database
+
+    database = Database(
+        database_name="database2",
+        sqlalchemy_uri="sqlite:///database2.db",
+        allow_dml=False,
+    )
+    session.add(database)
+    session.commit()
+
+    yield database
+
+    session.delete(database)
+    session.commit()
+    os.unlink("database2.db")
+
+
+@pytest.fixture
+def table2(session: Session, database2: "Database") -> Iterator[None]:
+    with database2.get_sqla_engine_with_context() as engine:
+        conn = engine.connect()
+        conn.execute("CREATE TABLE table2 (a INTEGER NOT NULL PRIMARY KEY, b TEXT)")
+        conn.execute("INSERT INTO table2 (a, b) VALUES (1, 'ten'), (2, 'twenty')")
+        session.commit()
+
+        yield
+
+        conn.execute("DROP TABLE table2")
+        session.commit()
+
+
+@with_feature_flags(ENABLE_SUPERSET_META_DB=True)
+def test_superset(mocker: MockFixture, app_context: None, table1: None) -> None:
+    """
+    Simple test querying a table.
+    """
+    mocker.patch("superset.extensions.metadb.security_manager")
+
+    engine = create_engine("superset://")
+    conn = engine.connect()
+    results = conn.execute('SELECT * FROM "database1.table1"')
+    assert list(results) == [(1, 10), (2, 20)]
+
+
+@with_feature_flags(ENABLE_SUPERSET_META_DB=True)
+def test_superset_limit(mocker: MockFixture, app_context: None, table1: None) -> None:
+    """
+    Simple that limit is applied when querying a table.
+    """
+    mocker.patch(
+        "superset.extensions.metadb.current_app.config",
+        {"SUPERSET_META_DB_LIMIT": 1},
+    )
+    mocker.patch("superset.extensions.metadb.security_manager")
+
+    engine = create_engine("superset://")
+    conn = engine.connect()
+    results = conn.execute('SELECT * FROM "database1.table1"')
+    assert list(results) == [(1, 10)]
+
+
+@with_feature_flags(ENABLE_SUPERSET_META_DB=True)
+def test_superset_joins(
+    mocker: MockFixture,
+    app_context: None,
+    table1: None,
+    table2: None,
+) -> None:
+    """
+    A test joining across databases.
+    """
+    mocker.patch("superset.extensions.metadb.security_manager")
+
+    engine = create_engine("superset://")
+    conn = engine.connect()
+    results = conn.execute(
+        """
+        SELECT t1.b, t2.b
+        FROM "database1.table1" AS t1
+        JOIN "database2.table2" AS t2
+        ON t1.a = t2.a
+        """
+    )
+    assert list(results) == [(10, "ten"), (20, "twenty")]
+
+
+@with_feature_flags(ENABLE_SUPERSET_META_DB=True)
+def test_dml(
+    mocker: MockFixture,
+    app_context: None,
+    table1: None,
+    table2: None,
+) -> None:
+    """
+    DML tests.
+
+    Test that we can update/delete data, only if DML is enabled.
+    """
+    mocker.patch("superset.extensions.metadb.security_manager")
+
+    engine = create_engine("superset://")
+    conn = engine.connect()
+
+    conn.execute('INSERT INTO "database1.table1" (a, b) VALUES (3, 30)')
+    results = conn.execute('SELECT * FROM "database1.table1"')
+    assert list(results) == [(1, 10), (2, 20), (3, 30)]
+    conn.execute('UPDATE "database1.table1" SET b=35 WHERE a=3')
+    results = conn.execute('SELECT * FROM "database1.table1"')
+    assert list(results) == [(1, 10), (2, 20), (3, 35)]
+    conn.execute('DELETE FROM "database1.table1" WHERE b>20')
+    results = conn.execute('SELECT * FROM "database1.table1"')
+    assert list(results) == [(1, 10), (2, 20)]
+
+    with pytest.raises(ProgrammingError) as excinfo:
+        conn.execute("""INSERT INTO "database2.table2" (a, b) VALUES (3, 'thirty')""")
+    assert str(excinfo.value).strip() == (
+        "(shillelagh.exceptions.ProgrammingError) DML not enabled in database "
+        '"database2"\n[SQL: INSERT INTO "database2.table2" (a, b) '
+        "VALUES (3, 'thirty')]\n(Background on this error at: "
+        "https://sqlalche.me/e/14/f405)"
+    )
+
+
+@with_feature_flags(ENABLE_SUPERSET_META_DB=True)
+def test_security_manager(mocker: MockFixture, app_context: None, table1: None) -> None:
+    """
+    Test that we use the security manager to check for permissions.
+    """
+    security_manager = mocker.MagicMock()
+    mocker.patch(
+        "superset.extensions.metadb.security_manager",
+        new=security_manager,
+    )
+    security_manager.raise_for_access.side_effect = SupersetSecurityException(
+        SupersetError(
+            error_type=SupersetErrorType.TABLE_SECURITY_ACCESS_ERROR,
+            message=(
+                "You need access to the following tables: `table1`,\n            "
+                "`all_database_access` or `all_datasource_access` permission"
+            ),
+            level=ErrorLevel.ERROR,
+        )
+    )
+
+    engine = create_engine("superset://")
+    conn = engine.connect()
+    with pytest.raises(SupersetSecurityException) as excinfo:
+        conn.execute('SELECT * FROM "database1.table1"')
+    assert str(excinfo.value) == (
+        "You need access to the following tables: `table1`,\n            "
+        "`all_database_access` or `all_datasource_access` permission"
+    )
+
+
+@with_feature_flags(ENABLE_SUPERSET_META_DB=True)
+def test_allowed_dbs(mocker: MockFixture, app_context: None, table1: None) -> None:
+    """
+    Test that DBs can be restricted.
+    """
+    mocker.patch("superset.extensions.metadb.security_manager")
+
+    engine = create_engine("superset://", allowed_dbs=["database1"])
+    conn = engine.connect()
+
+    results = conn.execute('SELECT * FROM "database1.table1"')
+    assert list(results) == [(1, 10), (2, 20)]
+
+    with pytest.raises(ProgrammingError) as excinfo:
+        conn.execute('SELECT * FROM "database2.table2"')
+    assert str(excinfo.value) == (
+        """
+(shillelagh.exceptions.ProgrammingError) Unsupported table: database2.table2
+[SQL: SELECT * FROM "database2.table2"]
+(Background on this error at: https://sqlalche.me/e/14/f405)
+        """.strip()
+    )