You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2023/03/16 22:07:44 UTC

[superset] branch postgres_set_schema updated (cc516c27ee -> 15a33414c1)

This is an automated email from the ASF dual-hosted git repository.

beto pushed a change to branch postgres_set_schema
in repository https://gitbox.apache.org/repos/asf/superset.git


 discard cc516c27ee feat(postgresql): dynamic schema
     new 15a33414c1 feat(postgresql): dynamic schema

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (cc516c27ee)
            \
             N -- N -- N   refs/heads/postgres_set_schema (15a33414c1)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 superset/db_engine_specs/snowflake.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)


[superset] 01/01: feat(postgresql): dynamic schema

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch postgres_set_schema
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 15a33414c116be739f65a38161070026163e2007
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Thu Mar 16 15:04:46 2023 -0700

    feat(postgresql): dynamic schema
---
 superset/db_engine_specs/base.py      |  7 +++--
 superset/db_engine_specs/drill.py     | 17 ++++++-----
 superset/db_engine_specs/hive.py      | 13 ++++----
 superset/db_engine_specs/mysql.py     | 11 +++----
 superset/db_engine_specs/postgres.py  | 56 +++++++++++++++++++++++++++--------
 superset/db_engine_specs/presto.py    | 17 ++++++-----
 superset/db_engine_specs/snowflake.py | 21 +++++++------
 superset/models/core.py               | 32 +++++++++++++-------
 8 files changed, 115 insertions(+), 59 deletions(-)

diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py
index 26dd169dc0..f4ef9bdd4c 100644
--- a/superset/db_engine_specs/base.py
+++ b/superset/db_engine_specs/base.py
@@ -1060,8 +1060,9 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
     def adjust_database_uri(  # pylint: disable=unused-argument
         cls,
         uri: URL,
-        selected_schema: Optional[str],
-    ) -> URL:
+        connect_args: Dict[str, Any],
+        schema: Optional[str],
+    ) -> Tuple[URL, Dict[str, Any]]:
         """
         Return a modified URL with a new database component.
 
@@ -1080,7 +1081,7 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
         Some database drivers like Presto accept '{catalog}/{schema}' in
         the database component of the URL, that can be handled here.
         """
-        return uri
+        return uri, connect_args
 
     @classmethod
     def patch(cls) -> None:
diff --git a/superset/db_engine_specs/drill.py b/superset/db_engine_specs/drill.py
index 4ae5ae59b3..cb90e84715 100644
--- a/superset/db_engine_specs/drill.py
+++ b/superset/db_engine_specs/drill.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 from datetime import datetime
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, Tuple
 from urllib import parse
 
 from sqlalchemy import types
@@ -71,13 +71,16 @@ class DrillEngineSpec(BaseEngineSpec):
         return None
 
     @classmethod
-    def adjust_database_uri(cls, uri: URL, selected_schema: Optional[str]) -> URL:
-        if selected_schema:
-            uri = uri.set(
-                database=parse.quote(selected_schema.replace(".", "/"), safe="")
-            )
+    def adjust_database_uri(
+        cls,
+        uri: URL,
+        connect_args: Dict[str, Any],
+        schema: Optional[str],
+    ) -> Tuple[URL, Dict[str, Any]]:
+        if schema:
+            uri = uri.set(database=parse.quote(schema.replace(".", "/"), safe=""))
 
-        return uri
+        return uri, connect_args
 
     @classmethod
     def get_schema_from_engine_params(
diff --git a/superset/db_engine_specs/hive.py b/superset/db_engine_specs/hive.py
index f90d889f8c..aac98085eb 100644
--- a/superset/db_engine_specs/hive.py
+++ b/superset/db_engine_specs/hive.py
@@ -261,12 +261,15 @@ class HiveEngineSpec(PrestoEngineSpec):
 
     @classmethod
     def adjust_database_uri(
-        cls, uri: URL, selected_schema: Optional[str] = None
-    ) -> URL:
-        if selected_schema:
-            uri = uri.set(database=parse.quote(selected_schema, safe=""))
+        cls,
+        uri: URL,
+        connect_args: Dict[str, Any],
+        schema: Optional[str] = None,
+    ) -> Tuple[URL, Dict[str, Any]]:
+        if schema:
+            uri = uri.set(database=parse.quote(schema, safe=""))
 
-        return uri
+        return uri, connect_args
 
     @classmethod
     def get_schema_from_engine_params(
diff --git a/superset/db_engine_specs/mysql.py b/superset/db_engine_specs/mysql.py
index 04b8c68dd7..008fc6ba6b 100644
--- a/superset/db_engine_specs/mysql.py
+++ b/superset/db_engine_specs/mysql.py
@@ -194,12 +194,13 @@ class MySQLEngineSpec(BaseEngineSpec, BasicParametersMixin):
     def adjust_database_uri(
         cls,
         uri: URL,
-        selected_schema: Optional[str] = None,
-    ) -> URL:
-        if selected_schema:
-            uri = uri.set(database=parse.quote(selected_schema, safe=""))
+        connect_args: Dict[str, Any],
+        schema: Optional[str] = None,
+    ) -> Tuple[URL, Dict[str, Any]]:
+        if schema:
+            uri = uri.set(database=parse.quote(schema, safe=""))
 
-        return uri
+        return uri, connect_args
 
     @classmethod
     def get_schema_from_engine_params(
diff --git a/superset/db_engine_specs/postgres.py b/superset/db_engine_specs/postgres.py
index 84ddf56e10..2607066305 100644
--- a/superset/db_engine_specs/postgres.py
+++ b/superset/db_engine_specs/postgres.py
@@ -78,6 +78,8 @@ class PostgresBaseEngineSpec(BaseEngineSpec):
     engine = ""
     engine_name = "PostgreSQL"
 
+    supports_dynamic_schema = True
+
     _time_grain_expressions = {
         None: "{col}",
         "PT1S": "DATE_TRUNC('second', {col})",
@@ -147,6 +149,30 @@ class PostgresBaseEngineSpec(BaseEngineSpec):
         ),
     }
 
+    @classmethod
+    def adjust_database_uri(
+        cls,
+        uri: URL,
+        connect_args: Dict[str, Any],
+        schema: Optional[str] = None,
+    ) -> Tuple[URL, Dict[str, Any]]:
+        if not schema:
+            return uri, connect_args
+
+        options = dict(
+            [
+                tuple(token.strip() for token in option.strip().split("=", 1))
+                for option in re.split(r"-c\s?", connect_args.get("options", ""))
+                if "=" in option
+            ]
+        )
+        options["search_path"] = schema
+        connect_args["options"] = " ".join(
+            f"-c{key}={value}" for key, value in options.items()
+        )
+
+        return uri, connect_args
+
     @classmethod
     def get_schema_from_engine_params(
         cls,
@@ -166,19 +192,23 @@ class PostgresBaseEngineSpec(BaseEngineSpec):
         to determine the schema for a non-qualified table in a query. In cases like
         that we raise an exception.
         """
-        options = re.split(r"-c\s?", connect_args.get("options", ""))
-        for option in options:
-            if "=" not in option:
-                continue
-            key, value = option.strip().split("=", 1)
-            if key.strip() == "search_path":
-                if "," in value:
-                    raise Exception(
-                        "Multiple schemas are configured in the search path, which means "
-                        "Superset is unable to determine the schema of unqualified table "
-                        "names and enforce permissions."
-                    )
-                return value.strip()
+        options = dict(
+            [
+                tuple(token.strip() for token in option.strip().split("=", 1))
+                for option in re.split(r"-c\s?", connect_args.get("options", ""))
+                if "=" in option
+            ]
+        )
+
+        if search_path := options.get("search_path"):
+            schemas = search_path.split(",")
+            if len(schemas) > 1:
+                raise Exception(
+                    "Multiple schemas are configured in the search path, which means "
+                    "Superset is unable to determine the schema of unqualified table "
+                    "names and enforce permissions."
+                )
+            return schemas[0]
 
         return None
 
diff --git a/superset/db_engine_specs/presto.py b/superset/db_engine_specs/presto.py
index dd7bd88cdb..8bac325d41 100644
--- a/superset/db_engine_specs/presto.py
+++ b/superset/db_engine_specs/presto.py
@@ -302,18 +302,21 @@ class PrestoBaseEngineSpec(BaseEngineSpec, metaclass=ABCMeta):
 
     @classmethod
     def adjust_database_uri(
-        cls, uri: URL, selected_schema: Optional[str] = None
-    ) -> URL:
+        cls,
+        uri: URL,
+        connect_args: Dict[str, Any],
+        schema: Optional[str] = None,
+    ) -> Tuple[URL, Dict[str, Any]]:
         database = uri.database
-        if selected_schema and database:
-            selected_schema = parse.quote(selected_schema, safe="")
+        if schema and database:
+            schema = parse.quote(schema, safe="")
             if "/" in database:
-                database = database.split("/")[0] + "/" + selected_schema
+                database = database.split("/")[0] + "/" + schema
             else:
-                database += "/" + selected_schema
+                database += "/" + schema
             uri = uri.set(database=database)
 
-        return uri
+        return uri, connect_args
 
     @classmethod
     def get_schema_from_engine_params(
diff --git a/superset/db_engine_specs/snowflake.py b/superset/db_engine_specs/snowflake.py
index ba15eea7fb..e02dc29c33 100644
--- a/superset/db_engine_specs/snowflake.py
+++ b/superset/db_engine_specs/snowflake.py
@@ -1,4 +1,4 @@
-# Licensed to the Apache Software Foundation (ASF) under one
+i Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
 # regarding copyright ownership.  The ASF licenses this file
@@ -136,16 +136,19 @@ class SnowflakeEngineSpec(PostgresBaseEngineSpec):
 
     @classmethod
     def adjust_database_uri(
-        cls, uri: URL, selected_schema: Optional[str] = None
-    ) -> URL:
+        cls,
+        uri: URL,
+        connect_args: Dict[str, Any],
+        schema: Optional[str] = None,
+    ) -> Tuple[URL, Dict[str, Any]]:
         database = uri.database
-        if "/" in uri.database:
-            database = uri.database.split("/")[0]
-        if selected_schema:
-            selected_schema = parse.quote(selected_schema, safe="")
-            uri = uri.set(database=f"{database}/{selected_schema}")
+        if "/" in database:
+            database = database.split("/")[0]
+        if schema:
+            schema = parse.quote(schema, safe="")
+            uri = uri.set(database=f"{database}/{schema}")
 
-        return uri
+        return uri, connect_args
 
     @classmethod
     def get_schema_from_engine_params(
diff --git a/superset/models/core.py b/superset/models/core.py
index 5717726edc..d063f7a0eb 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -421,32 +421,40 @@ class Database(
         source: Optional[utils.QuerySource] = None,
         sqlalchemy_uri: Optional[str] = None,
     ) -> Engine:
-        extra = self.get_extra()
         sqlalchemy_url = make_url_safe(
             sqlalchemy_uri if sqlalchemy_uri else self.sqlalchemy_uri_decrypted
         )
         self.db_engine_spec.validate_database_uri(sqlalchemy_url)
 
-        sqlalchemy_url = self.db_engine_spec.adjust_database_uri(sqlalchemy_url, schema)
+        extra = self.get_extra()
+        params = extra.get("engine_params", {})
+        if nullpool:
+            params["poolclass"] = NullPool
+        connect_args = params.get("connect_args", {})
+
+        sqlalchemy_url, connect_args = self.db_engine_spec.adjust_database_uri(
+            sqlalchemy_url,
+            connect_args,
+            schema,
+        )
         effective_username = self.get_effective_user(sqlalchemy_url)
         # If using MySQL or Presto for example, will set url.username
         # If using Hive, will not do anything yet since that relies on a
         # configuration parameter instead.
         sqlalchemy_url = self.db_engine_spec.get_url_for_impersonation(
-            sqlalchemy_url, self.impersonate_user, effective_username
+            sqlalchemy_url,
+            self.impersonate_user,
+            effective_username,
         )
 
         masked_url = self.get_password_masked_url(sqlalchemy_url)
         logger.debug("Database._get_sqla_engine(). Masked URL: %s", str(masked_url))
 
-        params = extra.get("engine_params", {})
-        if nullpool:
-            params["poolclass"] = NullPool
-
-        connect_args = params.get("connect_args", {})
         if self.impersonate_user:
             self.db_engine_spec.update_impersonation_config(
-                connect_args, str(sqlalchemy_url), effective_username
+                connect_args,
+                str(sqlalchemy_url),
+                effective_username,
             )
 
         if connect_args:
@@ -464,7 +472,11 @@ class Database(
                     source = utils.QuerySource.SQL_LAB
 
             sqlalchemy_url, params = DB_CONNECTION_MUTATOR(
-                sqlalchemy_url, params, effective_username, security_manager, source
+                sqlalchemy_url,
+                params,
+                effective_username,
+                security_manager,
+                source,
             )
         try:
             return create_engine(sqlalchemy_url, **params)