You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2024/03/20 20:04:56 UTC

(superset) branch sip-85 updated (aac320c8c7 -> 15b32cae7c)

This is an automated email from the ASF dual-hosted git repository.

beto pushed a change to branch sip-85
in repository https://gitbox.apache.org/repos/asf/superset.git


 discard aac320c8c7 WIP
     new 15b32cae7c WIP

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (aac320c8c7)
            \
             N -- N -- N   refs/heads/sip-85 (15b32cae7c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 superset/connectors/sqla/utils.py                  |  2 +-
 superset/db_engine_specs/README.md                 |  7 +-
 superset/db_engine_specs/base.py                   | 77 +++++++++++++++++++++-
 superset/db_engine_specs/drill.py                  |  6 +-
 superset/db_engine_specs/gsheets.py                |  1 +
 superset/db_engine_specs/hive.py                   | 11 +++-
 superset/db_engine_specs/impala.py                 |  1 +
 superset/db_engine_specs/trino.py                  | 15 ++++-
 superset/errors.py                                 |  1 +
 superset/exceptions.py                             | 16 +++++
 ...0_16-02_678eefb4ab44_add_access_token_table.py} | 43 ++++++++----
 superset/sql_lab.py                                |  9 ++-
 superset/sql_validators/presto_db.py               |  2 +-
 superset/utils/oauth2.py                           |  8 ++-
 .../unit_tests/db_engine_specs/test_clickhouse.py  |  5 +-
 tests/unit_tests/db_engine_specs/test_databend.py  |  5 +-
 .../db_engine_specs/test_elasticsearch.py          |  4 +-
 tests/unit_tests/sql_lab_test.py                   |  8 ++-
 18 files changed, 185 insertions(+), 36 deletions(-)
 copy superset/migrations/versions/{2020-07-09_17-12_73fd22e742ab_add_dynamic_plugins_py.py => 2024-03-20_16-02_678eefb4ab44_add_access_token_table.py} (61%)


(superset) 01/01: WIP

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch sip-85
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 15b32cae7cd056f451bae553b86b84f921288fc6
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Wed Mar 20 12:53:07 2024 -0400

    WIP
---
 superset/config.py                                 | 10 +++
 superset/connectors/sqla/utils.py                  |  2 +-
 superset/db_engine_specs/README.md                 |  7 +-
 superset/db_engine_specs/base.py                   | 77 +++++++++++++++++++++-
 superset/db_engine_specs/drill.py                  |  6 +-
 superset/db_engine_specs/gsheets.py                |  1 +
 superset/db_engine_specs/hive.py                   | 11 +++-
 superset/db_engine_specs/impala.py                 |  1 +
 superset/db_engine_specs/trino.py                  | 15 ++++-
 superset/errors.py                                 |  1 +
 superset/exceptions.py                             | 16 +++++
 ...20_16-02_678eefb4ab44_add_access_token_table.py | 76 +++++++++++++++++++++
 superset/models/core.py                            | 33 +++++++++-
 superset/sql_lab.py                                |  9 ++-
 superset/sql_validators/presto_db.py               |  2 +-
 superset/utils/oauth2.py                           | 69 +++++++++++++++++++
 .../unit_tests/db_engine_specs/test_clickhouse.py  |  5 +-
 tests/unit_tests/db_engine_specs/test_databend.py  |  5 +-
 .../db_engine_specs/test_elasticsearch.py          |  4 +-
 tests/unit_tests/sql_lab_test.py                   |  8 ++-
 20 files changed, 335 insertions(+), 23 deletions(-)

diff --git a/superset/config.py b/superset/config.py
index 197e4bac42..0de3f284e5 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -1392,6 +1392,16 @@ PREFERRED_DATABASES: list[str] = [
 # one here.
 TEST_DATABASE_CONNECTION_TIMEOUT = timedelta(seconds=30)
 
+# Details needed for databases that allows users to authenticate via personal OAuth2
+# tokens. See https://github.com/apache/superset/issues/20300 for details.
+DATABASE_OAUTH2_CREDENTIALS = {
+    "GSheets": {
+        "CLIENT_ID": "XXX.apps.googleusercontent.com",
+        "CLIENT_SECRET": "GOCSPX-YYY",
+        "REDIRECT_URI": "http://localhost:8088/api/v1/database/oauth2/",
+    },
+}
+
 # Enable/disable CSP warning
 CONTENT_SECURITY_POLICY_WARNING = True
 
diff --git a/superset/connectors/sqla/utils.py b/superset/connectors/sqla/utils.py
index 58a90e6eca..d0922e40f3 100644
--- a/superset/connectors/sqla/utils.py
+++ b/superset/connectors/sqla/utils.py
@@ -145,7 +145,7 @@ def get_columns_description(
             cursor = conn.cursor()
             query = database.apply_limit_to_sql(query, limit=1)
             cursor.execute(query)
-            db_engine_spec.execute(cursor, query)
+            db_engine_spec.execute(cursor, query, database.id)
             result = db_engine_spec.fetch_data(cursor, limit=1)
             result_set = SupersetResultSet(result, cursor.description, db_engine_spec)
             return result_set.columns
diff --git a/superset/db_engine_specs/README.md b/superset/db_engine_specs/README.md
index ee4c4ce9e5..3deb18c1b6 100644
--- a/superset/db_engine_specs/README.md
+++ b/superset/db_engine_specs/README.md
@@ -529,6 +529,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
         url: URL,
         impersonate_user: bool,
         username: str | None,
+        access_token: str | None,
     ) -> URL:
         if impersonate_user and username is not None:
             user = security_manager.find_user(username=username)
@@ -542,6 +543,10 @@ The method `get_url_for_impersonation` updates the SQLAlchemy URI before every q
 
 Alternatively, it's also possible to impersonate users by implementing the `update_impersonation_config`. This is a class method which modifies `connect_args` in place. You can use either method, and ideally they [should be consolidated in a single one](https://github.com/apache/superset/issues/24910).
 
+### OAuth2
+
+Some DB engine specs support OAuth2 authentication. See [SIP-85](https://github.com/apache/superset/issues/20300) for more details on how this is implemented.
+
 ### File upload
 
 When a DB engine spec supports file upload it declares so via the `supports_file_upload` class attribute. The base class implementation is very generic and should work for any database that has support for `CREATE TABLE`. It leverages Pandas and the [`df_to_sql`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html) method.
@@ -615,7 +620,7 @@ SELECT * FROM my_table
 
 The table `my_table` should live in the `dev` schema. In order to do that, it's necessary to modify the SQLAlchemy URI before running the query. Since different databases have different ways of doing that, this functionality is implemented via the `adjust_engine_params` class method. The method receives the SQLAlchemy URI and `connect_args`, as well as the schema in which the query should run. It then returns a potentially modified URI and `connect_args` to ensure that the query runs in  [...]
 
-When a DB engine specs implements `adjust_engine_params` it should have the class attribute `supports_dynamic_schema` set to true. This is critical for security, since **it allows Superset to know to which schema any unqualified table names belong to**. For example, in the query above, if the database supports dynamic schema, Superset would check to see if the user running the query has access to `dev.my_table`. On the other hand, if the database doesn't support dynamic schema, Superset  [...]
+When a DB engine specs implements `adjust_engine_params` it should have the class attribute `supports_dynamic_schema` set to true. This is critical for security, since **it allows Superset to know to which schema any unqualified table names belong to**. For example, in the query above, if the database supports dynamic schema, Superset would check to see if the user running the query has access to `dev.my_table`. On the other hand, if the database doesn't support dynamic schema, Superset  [...]
 
 Implementing this method is also important for usability. When the method is not implemented selecting the schema in SQL Lab has no effect on the schema in which the query runs, resulting in a confusing results when using unqualified table names.
 
diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py
index e8790bdcd4..b358a33ce4 100644
--- a/superset/db_engine_specs/base.py
+++ b/superset/db_engine_specs/base.py
@@ -59,6 +59,7 @@ from superset import security_manager, sql_parse
 from superset.constants import TimeGrain as TimeGrainConstants
 from superset.databases.utils import make_url_safe
 from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import OAuth2RedirectError
 from superset.sql_parse import ParsedQuery, SQLScript, Table
 from superset.superset_typing import ResultSetColumnType, SQLAColumnType
 from superset.utils import core as utils
@@ -170,6 +171,30 @@ class MetricType(TypedDict, total=False):
     extra: str | None
 
 
+class OAuth2TokenResponse(TypedDict, total=False):
+    """
+    Type for an OAuth2 response when exchanging or refreshing tokens.
+    """
+
+    access_token: str
+    expires_in: int
+    scope: str
+    token_type: str
+
+    # only present when exchanging code for refresh/access tokens
+    refresh_token: str
+
+
+class OAuth2State(TypedDict):
+    """
+    Type for the state passed during OAuth2.
+    """
+
+    database_id: int
+    user_id: int
+    default_redirect_uri: str
+
+
 class BaseEngineSpec:  # pylint: disable=too-many-public-methods
     """Abstract class for database engine specific configurations
 
@@ -397,6 +422,34 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
     # Can the catalog be changed on a per-query basis?
     supports_dynamic_catalog = False
 
+    # Driver-specific exception that should be mapped to OAuth2RedirectError
+    oauth2_exception = OAuth2RedirectError
+
+    @staticmethod
+    def is_oauth2_enabled() -> bool:
+        return False
+
+    @staticmethod
+    def get_oauth2_authorization_uri(database_id: int) -> str:
+        """
+        Return URI for initial OAuth2 request.
+        """
+        raise NotImplementedError()
+
+    @staticmethod
+    def get_oauth2_token(code: str) -> OAuth2TokenResponse:
+        """
+        Exchange authorization code for refresh/access tokens.
+        """
+        raise NotImplementedError()
+
+    @staticmethod
+    def get_oauth2_fresh_token(refresh_token: str) -> OAuth2TokenResponse:
+        """
+        Refresh an access token that has expired.
+        """
+        raise NotImplementedError()
+
     @classmethod
     def get_allows_alias_in_select(
         cls, database: Database  # pylint: disable=unused-argument
@@ -1079,7 +1132,12 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
         # TODO: Fix circular import error caused by importing sql_lab.Query
 
     @classmethod
-    def execute_with_cursor(cls, cursor: Any, sql: str, query: Query) -> None:
+    def execute_with_cursor(
+        cls,
+        cursor: Any,
+        sql: str,
+        query: Query,
+    ) -> None:
         """
         Trigger execution of a query and handle the resulting cursor.
 
@@ -1090,7 +1148,7 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
         in a timely manner and facilitate operations such as query stop
         """
         logger.debug("Query %d: Running query: %s", query.id, sql)
-        cls.execute(cursor, sql, async_=True)
+        cls.execute(cursor, sql, query.database.id, async_=True)
         logger.debug("Query %d: Handling cursor", query.id)
         cls.handle_cursor(cursor, query)
 
@@ -1536,7 +1594,11 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
 
     @classmethod
     def get_url_for_impersonation(
-        cls, url: URL, impersonate_user: bool, username: str | None
+        cls,
+        url: URL,
+        impersonate_user: bool,
+        username: str | None,
+        access_token: str | None,  # pylint: disable=unused-argument
     ) -> URL:
         """
         Return a modified URL with the username set.
@@ -1544,6 +1606,7 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
         :param url: SQLAlchemy URL object
         :param impersonate_user: Flag indicating if impersonation is enabled
         :param username: Effective username
+        :param access_token: Personal access token
         """
         if impersonate_user and username is not None:
             url = url.set(username=username)
@@ -1572,6 +1635,7 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
         cls,
         cursor: Any,
         query: str,
+        database_id: int,
         **kwargs: Any,
     ) -> None:
         """
@@ -1579,6 +1643,7 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
 
         :param cursor: Cursor instance
         :param query: Query to execute
+        :param database_id: ID of the database where the query will run
         :param kwargs: kwargs to be passed to cursor.execute()
         :return:
         """
@@ -1589,6 +1654,12 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
             cursor.arraysize = cls.arraysize
         try:
             cursor.execute(query)
+        except cls.oauth2_exception as ex:
+            if cls.is_oauth2_enabled():
+                oauth_url = cls.get_oauth2_authorization_uri(database_id)
+                raise OAuth2RedirectError(oauth_url) from ex
+
+            raise cls.get_dbapi_mapped_exception(ex) from ex
         except Exception as ex:
             raise cls.get_dbapi_mapped_exception(ex) from ex
 
diff --git a/superset/db_engine_specs/drill.py b/superset/db_engine_specs/drill.py
index 276ff5b185..e99d4a27f4 100644
--- a/superset/db_engine_specs/drill.py
+++ b/superset/db_engine_specs/drill.py
@@ -100,7 +100,11 @@ class DrillEngineSpec(BaseEngineSpec):
 
     @classmethod
     def get_url_for_impersonation(
-        cls, url: URL, impersonate_user: bool, username: str | None
+        cls,
+        url: URL,
+        impersonate_user: bool,
+        username: str | None,
+        access_token: str | None,
     ) -> URL:
         """
         Return a modified URL with the username set.
diff --git a/superset/db_engine_specs/gsheets.py b/superset/db_engine_specs/gsheets.py
index 18349f4314..7319ea71d0 100644
--- a/superset/db_engine_specs/gsheets.py
+++ b/superset/db_engine_specs/gsheets.py
@@ -110,6 +110,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
         url: URL,
         impersonate_user: bool,
         username: str | None,
+        access_token: str | None,
     ) -> URL:
         if impersonate_user and username is not None:
             user = security_manager.find_user(username=username)
diff --git a/superset/db_engine_specs/hive.py b/superset/db_engine_specs/hive.py
index 9222d55db0..a97dd88aef 100644
--- a/superset/db_engine_specs/hive.py
+++ b/superset/db_engine_specs/hive.py
@@ -505,7 +505,11 @@ class HiveEngineSpec(PrestoEngineSpec):
 
     @classmethod
     def get_url_for_impersonation(
-        cls, url: URL, impersonate_user: bool, username: str | None
+        cls,
+        url: URL,
+        impersonate_user: bool,
+        username: str | None,
+        access_token: str | None,
     ) -> URL:
         """
         Return a modified URL with the username set.
@@ -547,7 +551,10 @@ class HiveEngineSpec(PrestoEngineSpec):
 
     @staticmethod
     def execute(  # type: ignore
-        cursor, query: str, async_: bool = False
+        cursor,
+        query: str,
+        database_id: int,
+        async_: bool = False,
     ):  # pylint: disable=arguments-differ
         kwargs = {"async": async_}
         cursor.execute(query, **kwargs)
diff --git a/superset/db_engine_specs/impala.py b/superset/db_engine_specs/impala.py
index 9e5f728a6f..8cda5b5861 100644
--- a/superset/db_engine_specs/impala.py
+++ b/superset/db_engine_specs/impala.py
@@ -93,6 +93,7 @@ class ImpalaEngineSpec(BaseEngineSpec):
         cls,
         cursor: Any,
         query: str,
+        database_id: int,
         **kwargs: Any,
     ) -> None:
         try:
diff --git a/superset/db_engine_specs/trino.py b/superset/db_engine_specs/trino.py
index 6d95f9589e..4513d63c60 100644
--- a/superset/db_engine_specs/trino.py
+++ b/superset/db_engine_specs/trino.py
@@ -132,7 +132,11 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
 
     @classmethod
     def get_url_for_impersonation(
-        cls, url: URL, impersonate_user: bool, username: str | None
+        cls,
+        url: URL,
+        impersonate_user: bool,
+        username: str | None,
+        access_token: str | None,
     ) -> URL:
         """
         Return a modified URL with the username set.
@@ -191,7 +195,12 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
         super().handle_cursor(cursor=cursor, query=query)
 
     @classmethod
-    def execute_with_cursor(cls, cursor: Cursor, sql: str, query: Query) -> None:
+    def execute_with_cursor(
+        cls,
+        cursor: Cursor,
+        sql: str,
+        query: Query,
+    ) -> None:
         """
         Trigger execution of a query and handle the resulting cursor.
 
@@ -210,7 +219,7 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
             logger.debug("Query %d: Running query: %s", query_id, sql)
 
             try:
-                cls.execute(cursor, sql)
+                cls.execute(cursor, sql, query.database.id)
             except Exception as ex:  # pylint: disable=broad-except
                 results["error"] = ex
             finally:
diff --git a/superset/errors.py b/superset/errors.py
index 7c38389167..b17be0cf85 100644
--- a/superset/errors.py
+++ b/superset/errors.py
@@ -67,6 +67,7 @@ class SupersetErrorType(StrEnum):
     USER_ACTIVITY_SECURITY_ACCESS_ERROR = "USER_ACTIVITY_SECURITY_ACCESS_ERROR"
     DASHBOARD_SECURITY_ACCESS_ERROR = "DASHBOARD_SECURITY_ACCESS_ERROR"
     CHART_SECURITY_ACCESS_ERROR = "CHART_SECURITY_ACCESS_ERROR"
+    OAUTH2_REDIRECT = "OAUTH2_REDIRECT"
 
     # Other errors
     BACKEND_TIMEOUT_ERROR = "BACKEND_TIMEOUT_ERROR"
diff --git a/superset/exceptions.py b/superset/exceptions.py
index 0ce72e2e1a..706d316b04 100644
--- a/superset/exceptions.py
+++ b/superset/exceptions.py
@@ -312,3 +312,19 @@ class SupersetParseError(SupersetErrorException):
             extra={"sql": sql, "engine": engine},
         )
         super().__init__(error)
+
+
+class OAuth2RedirectError(SupersetErrorException):
+    """
+    Exception used to start OAuth2 dance for personal tokens.
+    """
+
+    def __init__(self, url: str):
+        super().__init__(
+            SupersetError(
+                message="You don't have permission to access the data.",
+                error_type=SupersetErrorType.OAUTH2_REDIRECT,
+                level=ErrorLevel.WARNING,
+                extra={"url": url},
+            )
+        )
diff --git a/superset/migrations/versions/2024-03-20_16-02_678eefb4ab44_add_access_token_table.py b/superset/migrations/versions/2024-03-20_16-02_678eefb4ab44_add_access_token_table.py
new file mode 100644
index 0000000000..717ac68f06
--- /dev/null
+++ b/superset/migrations/versions/2024-03-20_16-02_678eefb4ab44_add_access_token_table.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Add access token table
+
+Revision ID: 678eefb4ab44
+Revises: be1b217cd8cd
+Create Date: 2024-03-20 16:02:58.515915
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = "678eefb4ab44"
+down_revision = "be1b217cd8cd"
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy_utils import EncryptedType
+
+
+def upgrade():
+    op.create_table(
+        "database_user_oauth2_tokens",
+        sa.Column("created_on", sa.DateTime(), nullable=True),
+        sa.Column("changed_on", sa.DateTime(), nullable=True),
+        sa.Column("id", sa.Integer(), nullable=False),
+        sa.Column("user_id", sa.Integer(), nullable=True),
+        sa.Column("database_id", sa.Integer(), nullable=False),
+        sa.Column(
+            "access_token",
+            EncryptedType(),
+            nullable=True,
+        ),
+        sa.Column("access_token_expiration", sa.DateTime(), nullable=True),
+        sa.Column(
+            "refresh_token",
+            EncryptedType(),
+            nullable=True,
+        ),
+        sa.Column("created_by_fk", sa.Integer(), nullable=True),
+        sa.Column("changed_by_fk", sa.Integer(), nullable=True),
+        sa.ForeignKeyConstraint(
+            ["changed_by_fk"],
+            ["ab_user.id"],
+        ),
+        sa.ForeignKeyConstraint(
+            ["created_by_fk"],
+            ["ab_user.id"],
+        ),
+        sa.ForeignKeyConstraint(
+            ["database_id"],
+            ["dbs.id"],
+        ),
+        sa.ForeignKeyConstraint(
+            ["user_id"],
+            ["ab_user.id"],
+        ),
+        sa.PrimaryKeyConstraint("id"),
+    )
+
+
+def downgrade():
+    op.drop_table("database_user_oauth2_tokens")
diff --git a/superset/models/core.py b/superset/models/core.py
index 71a6e9d042..27ee347139 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -75,6 +75,7 @@ from superset.superset_typing import ResultSetColumnType
 from superset.utils import cache as cache_util, core as utils
 from superset.utils.backports import StrEnum
 from superset.utils.core import get_username
+from superset.utils.oauth2 import get_oauth2_access_token
 
 config = app.config
 custom_password_store = config["SQLALCHEMY_CUSTOM_PASSWORD_STORE"]
@@ -461,6 +462,11 @@ class Database(
         )
 
         effective_username = self.get_effective_user(sqlalchemy_url)
+        access_token = (
+            get_oauth2_access_token(self.id, g.user.id, self.db_engine_spec)
+            if hasattr(g, "user")
+            else None
+        )
         # If using MySQL or Presto for example, will set url.username
         # If using Hive, will not do anything yet since that relies on a
         # configuration parameter instead.
@@ -468,6 +474,7 @@ class Database(
             sqlalchemy_url,
             self.impersonate_user,
             effective_username,
+            access_token,
         )
 
         masked_url = self.get_password_masked_url(sqlalchemy_url)
@@ -588,7 +595,7 @@ class Database(
                         database=None,
                     )
                 _log_query(sql_)
-                self.db_engine_spec.execute(cursor, sql_)
+                self.db_engine_spec.execute(cursor, sql_, self.id)
                 cursor.fetchall()
 
             if mutate_after_split:
@@ -598,10 +605,10 @@ class Database(
                     database=None,
                 )
                 _log_query(last_sql)
-                self.db_engine_spec.execute(cursor, last_sql)
+                self.db_engine_spec.execute(cursor, last_sql, self.id)
             else:
                 _log_query(sqls[-1])
-                self.db_engine_spec.execute(cursor, sqls[-1])
+                self.db_engine_spec.execute(cursor, sqls[-1], self.id)
 
             data = self.db_engine_spec.fetch_data(cursor)
             result_set = SupersetResultSet(
@@ -978,6 +985,26 @@ sqla.event.listen(Database, "after_update", security_manager.database_after_upda
 sqla.event.listen(Database, "after_delete", security_manager.database_after_delete)
 
 
+class DatabaseUserOAuth2Tokens(Model, AuditMixinNullable):
+    """
+    Store OAuth2 tokens, for authenticating to DBs using user personal tokens.
+    """
+
+    __tablename__ = "database_user_oauth2_tokens"
+
+    id = Column(Integer, primary_key=True)
+
+    user_id = Column(Integer, ForeignKey("ab_user.id"))
+    user = relationship(security_manager.user_model, foreign_keys=[user_id])
+
+    database_id = Column(Integer, ForeignKey("dbs.id"), nullable=False)
+    database = relationship("Database", foreign_keys=[database_id])
+
+    access_token = Column(encrypted_field_factory.create(Text), nullable=True)
+    access_token_expiration = Column(DateTime, nullable=True)
+    refresh_token = Column(encrypted_field_factory.create(Text), nullable=True)
+
+
 class Log(Model):  # pylint: disable=too-few-public-methods
     """ORM object used to log Superset actions to the database"""
 
diff --git a/superset/sql_lab.py b/superset/sql_lab.py
index 1b883a77cf..c75e3c6bbd 100644
--- a/superset/sql_lab.py
+++ b/superset/sql_lab.py
@@ -41,7 +41,11 @@ from superset.constants import QUERY_CANCEL_KEY, QUERY_EARLY_CANCEL_KEY
 from superset.dataframe import df_to_records
 from superset.db_engine_specs import BaseEngineSpec
 from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
-from superset.exceptions import SupersetErrorException, SupersetErrorsException
+from superset.exceptions import (
+    OAuth2RedirectError,
+    SupersetErrorException,
+    SupersetErrorsException,
+)
 from superset.extensions import celery_app
 from superset.models.core import Database
 from superset.models.sql_lab import Query
@@ -308,6 +312,9 @@ def execute_sql_statement(
                 level=ErrorLevel.ERROR,
             )
         ) from ex
+    except OAuth2RedirectError as ex:
+        # user needs to authenticate with OAuth2 in order to run query
+        raise ex
     except Exception as ex:
         # query is stopped in another thread/worker
         # stopping raises expected exceptions which we should skip
diff --git a/superset/sql_validators/presto_db.py b/superset/sql_validators/presto_db.py
index fed1ff3bfa..8c815ad63e 100644
--- a/superset/sql_validators/presto_db.py
+++ b/superset/sql_validators/presto_db.py
@@ -73,7 +73,7 @@ class PrestoDBSQLValidator(BaseSQLValidator):
         from pyhive.exc import DatabaseError
 
         try:
-            db_engine_spec.execute(cursor, sql)
+            db_engine_spec.execute(cursor, sql, database.id)
             polled = cursor.poll()
             while polled:
                 logger.info("polling presto for validation progress")
diff --git a/superset/utils/oauth2.py b/superset/utils/oauth2.py
new file mode 100644
index 0000000000..e391d527a3
--- /dev/null
+++ b/superset/utils/oauth2.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime, timedelta
+
+from superset import db
+from superset.db_engine_specs.base import BaseEngineSpec
+
+
+def get_oauth2_access_token(
+    database_id: int,
+    user_id: int,
+    db_engine_spec: type[BaseEngineSpec],
+) -> str | None:
+    """
+    Return a valid OAuth2 access token.
+
+    If the token exists but is expired and a refresh token is available the function will
+    return a fresh token and store it in the database for further requests.
+    """
+    # pylint: disable=import-outside-toplevel
+    from superset.models.core import DatabaseUserOAuth2Tokens
+
+    token = (
+        db.session.query(DatabaseUserOAuth2Tokens)
+        .filter_by(user_id=user_id, database_id=database_id)
+        .one_or_none()
+    )
+    if token is None:
+        return None
+
+    if token.access_token and token.access_token_expiration < datetime.now():
+        return token.access_token
+
+    if token.refresh_token:
+        # refresh access token
+        token_response = db_engine_spec.get_oauth2_fresh_token(token.refresh_token)
+
+        # store new access token; note that the refresh token might be revoked, in which
+        # case there would be no access token in the response
+        if "access_token" in token_response:
+            token.access_token = token_response["access_token"]
+            token.access_token_expiration = datetime.now() + timedelta(
+                seconds=token_response["expires_in"]
+            )
+            db.session.add(token)
+
+            return token.access_token
+
+    # since the access token is expired and there's no refresh token, delete the entry
+    db.session.delete(token)
+
+    return None
diff --git a/tests/unit_tests/db_engine_specs/test_clickhouse.py b/tests/unit_tests/db_engine_specs/test_clickhouse.py
index 3f28341f26..94b70ba526 100644
--- a/tests/unit_tests/db_engine_specs/test_clickhouse.py
+++ b/tests/unit_tests/db_engine_specs/test_clickhouse.py
@@ -65,8 +65,9 @@ def test_execute_connection_error() -> None:
     cursor.execute.side_effect = NewConnectionError(
         HTTPConnection("localhost"), "Exception with sensitive data"
     )
-    with pytest.raises(SupersetDBAPIDatabaseError) as ex:
-        ClickHouseEngineSpec.execute(cursor, "SELECT col1 from table1")
+    with pytest.raises(SupersetDBAPIDatabaseError) as excinfo:
+        ClickHouseEngineSpec.execute(cursor, "SELECT col1 from table1", 1)
+    assert str(excinfo.value) == "Connection failed"
 
 
 @pytest.mark.parametrize(
diff --git a/tests/unit_tests/db_engine_specs/test_databend.py b/tests/unit_tests/db_engine_specs/test_databend.py
index 9c494492d9..06fab79188 100644
--- a/tests/unit_tests/db_engine_specs/test_databend.py
+++ b/tests/unit_tests/db_engine_specs/test_databend.py
@@ -66,8 +66,9 @@ def test_execute_connection_error() -> None:
     cursor.execute.side_effect = NewConnectionError(
         HTTPConnection("Dummypool"), "Exception with sensitive data"
     )
-    with pytest.raises(SupersetDBAPIDatabaseError) as ex:
-        DatabendEngineSpec.execute(cursor, "SELECT col1 from table1")
+    with pytest.raises(SupersetDBAPIDatabaseError) as excinfo:
+        DatabendEngineSpec.execute(cursor, "SELECT col1 from table1", 1)
+    assert str(excinfo.value) == "Connection failed"
 
 
 @pytest.mark.parametrize(
diff --git a/tests/unit_tests/db_engine_specs/test_elasticsearch.py b/tests/unit_tests/db_engine_specs/test_elasticsearch.py
index 0c15977669..ed80454d3c 100644
--- a/tests/unit_tests/db_engine_specs/test_elasticsearch.py
+++ b/tests/unit_tests/db_engine_specs/test_elasticsearch.py
@@ -101,6 +101,8 @@ def test_opendistro_strip_comments() -> None:
     mock_cursor.execute.return_value = []
 
     OpenDistroEngineSpec.execute(
-        mock_cursor, "-- some comment \nSELECT 1\n --other comment"
+        mock_cursor,
+        "-- some comment \nSELECT 1\n --other comment",
+        1,
     )
     mock_cursor.execute.assert_called_once_with("SELECT 1\n")
diff --git a/tests/unit_tests/sql_lab_test.py b/tests/unit_tests/sql_lab_test.py
index 83e7c373c8..3e5a808815 100644
--- a/tests/unit_tests/sql_lab_test.py
+++ b/tests/unit_tests/sql_lab_test.py
@@ -55,7 +55,9 @@ def test_execute_sql_statement(mocker: MockerFixture, app: None) -> None:
 
     database.apply_limit_to_sql.assert_called_with("SELECT 42 AS answer", 2, force=True)
     db_engine_spec.execute_with_cursor.assert_called_with(
-        cursor, "SELECT 42 AS answer LIMIT 2", query
+        cursor,
+        "SELECT 42 AS answer LIMIT 2",
+        query,
     )
     SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)
 
@@ -104,7 +106,9 @@ def test_execute_sql_statement_with_rls(
         force=True,
     )
     db_engine_spec.execute_with_cursor.assert_called_with(
-        cursor, "SELECT * FROM sales WHERE organization_id=42 LIMIT 101", query
+        cursor,
+        "SELECT * FROM sales WHERE organization_id=42 LIMIT 101",
+        query,
     )
     SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)