You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@superset.apache.org by GitBox <gi...@apache.org> on 2022/04/19 03:12:41 UTC

[GitHub] [superset] betodealmeida commented on a diff in pull request #19421: perf: refactor SIP-68 db migrations with INSERT SELECT FROM

betodealmeida commented on code in PR #19421:
URL: https://github.com/apache/superset/pull/19421#discussion_r852544625


##########
superset/connectors/sqla/models.py:
##########
@@ -417,6 +426,59 @@ def data(self) -> Dict[str, Any]:
 
         return attr_dict
 
+    def to_sl_column(
+        self, known_columns: Optional[Dict[str, NewColumn]] = None
+    ) -> NewColumn:
+        """Convert a TableColumn to NewColumn"""
+        column = known_columns.get(self.uuid) if known_columns else None
+        if not column:
+            column = NewColumn()
+
+        extra_json = self.get_extra_dict()
+        for attr in {
+            "verbose_name",
+            "python_date_format",
+        }:
+            value = getattr(self, attr)
+            if value:
+                extra_json[attr] = value
+
+        column.uuid = self.uuid
+        column.created_on = self.created_on
+        column.changed_on = self.changed_on
+        column.created_by = self.created_by
+        column.changed_by = self.changed_by
+        column.name = self.column_name
+        column.type = self.type or "Unknown"

Review Comment:
   ```suggestion
           column.type = self.type or "UNKNOWN"
   ```
   
   (For consistency with the default value.)



##########
superset/connectors/sqla/models.py:
##########
@@ -479,6 +541,58 @@ def data(self) -> Dict[str, Any]:
         attr_dict.update(super().data)
         return attr_dict
 
+    def to_sl_column(
+        self, known_columns: Optional[Dict[str, NewColumn]] = None
+    ) -> NewColumn:
+        """Convert a SqlMetric to NewColumn. Find and update existing or
+        create a new one."""
+        column = known_columns.get(self.uuid) if known_columns else None
+        if not column:
+            column = NewColumn()
+
+        extra_json = self.get_extra_dict()
+        for attr in {"verbose_name", "metric_type", "d3format"}:
+            value = getattr(self, attr)
+            if value is not None:
+                extra_json[attr] = value
+        is_additive = (
+            self.metric_type and self.metric_type.lower() in ADDITIVE_METRIC_TYPES_LOWER
+        )
+
+        column.uuid = self.uuid
+        column.name = self.metric_name
+        column.created_on = self.created_on
+        column.changed_on = self.changed_on
+        column.created_by = self.created_by
+        column.changed_by = self.changed_by
+        column.type = "Unknown"

Review Comment:
   ```suggestion
           column.type = "UNKNOWN"
   ```



##########
superset/connectors/sqla/models.py:
##########
@@ -2237,29 +2245,34 @@ def after_update(  # pylint: disable=too-many-branches, too-many-locals, too-man
                 column.is_physical = False
 
             # update referenced tables if SQL changed
-            if inspector.attrs.sql.history.has_changes():
-                parsed = ParsedQuery(target.sql)
-                referenced_tables = parsed.tables
-
-                predicate = or_(
-                    *[
-                        and_(
-                            NewTable.schema == (table.schema or target.schema),
-                            NewTable.name == table.table,
-                        )
-                        for table in referenced_tables
-                    ]
+            if sqla_table.sql and inspector.attrs.sql.history.has_changes():
+                referenced_tables = extract_table_references(
+                    sqla_table.sql, sqla_table.database.get_dialect().name
+                )
+                dataset.tables = NewTable.load_or_create(

Review Comment:
   This should be inside a list, no?
   
   ```python
   dataset.tables = [NewTable.load_or_create(...)]
   ```



##########
superset/connectors/sqla/models.py:
##########
@@ -2370,26 +2342,39 @@ def write_shadow_dataset(  # pylint: disable=too-many-locals
                 column.is_physical = False
 
             # find referenced tables
-            parsed = ParsedQuery(dataset.sql)
-            referenced_tables = parsed.tables
-            tables = load_or_create_tables(
-                session,
-                database,
-                dataset.schema,
+            referenced_tables = extract_table_references(
+                self.sql, self.database.get_dialect().name
+            )
+            tables = NewTable.load_or_create(

Review Comment:
   Same here?



##########
superset/connectors/sqla/models.py:
##########
@@ -2063,172 +2131,72 @@ def after_update(  # pylint: disable=too-many-branches, too-many-locals, too-man
 
         For more context: https://github.com/apache/superset/issues/14909
         """
-        inspector = inspect(target)
+        # set permissions
+        security_manager.set_perm(mapper, connection, sqla_table)
+
+        inspector = inspect(sqla_table)
         session = inspector.session
 
         # double-check that ``UPDATE``s are actually pending (this method is called even
         # for instances that have no net changes to their column-based attributes)
-        if not session.is_modified(target, include_collections=True):
+        if not session.is_modified(sqla_table, include_collections=True):
             return
 
-        # set permissions
-        security_manager.set_perm(mapper, connection, target)
-
         dataset = (
-            session.query(NewDataset).filter_by(sqlatable_id=target.id).one_or_none()
+            session.query(NewDataset).filter_by(uuid=sqla_table.uuid).one_or_none()
         )
         if not dataset:
+            sqla_table.write_shadow_dataset()
             return
 
-        # get DB-specific conditional quoter for expressions that point to columns or
-        # table names
-        database = (
-            target.database
-            or session.query(Database).filter_by(id=target.database_id).one()
-        )
-        engine = database.get_sqla_engine(schema=target.schema)
-        conditional_quote = engine.dialect.identifier_preparer.quote
-
-        # update columns
-        if inspector.attrs.columns.history.has_changes():
-            # handle deleted columns
-            if inspector.attrs.columns.history.deleted:
-                column_names = {
-                    column.column_name
-                    for column in inspector.attrs.columns.history.deleted
-                }
-                dataset.columns = [
-                    column
-                    for column in dataset.columns
-                    if column.name not in column_names
-                ]
-
-            # handle inserted columns
-            for column in inspector.attrs.columns.history.added:
-                # ``is_active`` might be ``None``, but it defaults to ``True``.
-                if column.is_active is False:
-                    continue
-
-                extra_json = json.loads(column.extra or "{}")
-                for attr in {
-                    "groupby",
-                    "filterable",
-                    "verbose_name",
-                    "python_date_format",
-                }:
-                    value = getattr(column, attr)
-                    if value:
-                        extra_json[attr] = value
-
-                dataset.columns.append(
-                    NewColumn(
-                        name=column.column_name,
-                        type=column.type or "Unknown",
-                        expression=column.expression
-                        or conditional_quote(column.column_name),
-                        description=column.description,
-                        is_temporal=column.is_dttm,
-                        is_aggregation=False,
-                        is_physical=column.expression is None,
-                        is_spatial=False,
-                        is_partition=False,
-                        is_increase_desired=True,
-                        extra_json=json.dumps(extra_json) if extra_json else None,
-                        is_managed_externally=target.is_managed_externally,
-                        external_url=target.external_url,
-                    )

Review Comment:
   Nice!



##########
superset/connectors/sqla/models.py:
##########
@@ -2273,95 +2286,54 @@ def write_shadow_dataset(  # pylint: disable=too-many-locals
 
         For more context: https://github.com/apache/superset/issues/14909
         """
-
-        engine = database.get_sqla_engine(schema=dataset.schema)
-        conditional_quote = engine.dialect.identifier_preparer.quote
+        session = inspect(self).session
+        if self.database_id and (
+            not self.database or self.database.id != self.database_id
+        ):
+            self.database = session.query(Database).filter_by(id=self.database_id).one()

Review Comment:
   Nice!



##########
superset/connectors/sqla/models.py:
##########
@@ -1745,13 +1859,16 @@ def fetch_metadata(self, commit: bool = True) -> MetadataResult:
         )
 
         # clear old columns before adding modified columns back
-        self.columns = []
+        columns = []
         for col in new_columns:
             old_column = old_columns_by_name.pop(col["name"], None)
             if not old_column:
                 results.added.append(col["name"])
                 new_column = TableColumn(
-                    column_name=col["name"], type=col["type"], table=self
+                    column_name=col["name"],
+                    type=col["type"],
+                    table_id=self.id,
+                    table=self,

Review Comment:
   Do you need to add both `table_id` and `table` here? I thought only one was necessary.



##########
superset/connectors/sqla/models.py:
##########
@@ -417,6 +426,59 @@ def data(self) -> Dict[str, Any]:
 
         return attr_dict
 
+    def to_sl_column(
+        self, known_columns: Optional[Dict[str, NewColumn]] = None
+    ) -> NewColumn:
+        """Convert a TableColumn to NewColumn"""
+        column = known_columns.get(self.uuid) if known_columns else None
+        if not column:
+            column = NewColumn()
+
+        extra_json = self.get_extra_dict()
+        for attr in {
+            "verbose_name",
+            "python_date_format",
+        }:
+            value = getattr(self, attr)
+            if value:
+                extra_json[attr] = value
+
+        column.uuid = self.uuid
+        column.created_on = self.created_on
+        column.changed_on = self.changed_on
+        column.created_by = self.created_by
+        column.changed_by = self.changed_by
+        column.name = self.column_name
+        column.type = self.type or "Unknown"
+        column.expression = self.expression or self.table.quote_identifier(
+            self.column_name
+        )
+        column.description = self.description
+        column.is_aggregation = False
+        column.is_dimensional = self.groupby
+        column.is_filterable = self.filterable
+        column.is_increase_desired = True
+        column.is_managed_externally = self.table.is_managed_externally
+        column.is_partition = False
+        column.is_physical = not self.expression
+        column.is_spatial = False
+        column.is_temporal = self.is_dttm
+        column.extra_json = json.dumps(extra_json) if extra_json else None
+        column.external_url = self.table.external_url
+
+        return column
+
+    @staticmethod
+    def after_delete(  # pylint: disable=unused-argument
+        mapper: Mapper,
+        connection: Connection,
+        target: "TableColumn",
+    ) -> None:
+        session = inspect(target).session
+        dataset = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
+        if dataset:
+            session.delete(dataset)

Review Comment:
   ```suggestion
           column = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
           if column:
               session.delete(column)
   ```
   
   Leveraging the uuid is a really nice idea!



##########
superset/tables/models.py:
##########
@@ -80,13 +107,96 @@ class Table(Model, AuditMixinNullable, ExtraJSONMixin, ImportExportMixin):
     schema = sa.Column(sa.Text)
     name = sa.Column(sa.Text)
 
-    # The relationship between tables and columns is 1:n, but we use a many-to-many
-    # association to differentiate between the relationship between datasets and
-    # columns.
-    columns: List[Column] = relationship(
-        "Column", secondary=association_table, cascade="all, delete"
-    )
-
     # Column is managed externally and should be read-only inside Superset
     is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
     external_url = sa.Column(sa.Text, nullable=True)
+
+    @property
+    def fullname(self) -> str:
+        return str(TableName(table=self.name, schema=self.schema, catalog=self.catalog))
+
+    def __repr__(self) -> str:
+        return f"<Table id={self.id} database_id={self.database_id} {self.fullname}>"
+
+    def sync_columns(self) -> None:
+        """Sync table columns with the database. Keep metadata for existing columns"""
+        try:
+            column_metadata = get_physical_table_metadata(
+                self.database, self.name, self.schema
+            )
+        except Exception:  # pylint: disable=broad-except
+            column_metadata = []
+
+        existing_columns = {column.name: column for column in self.columns}
+        quote_identifier = self.database.quote_identifier
+
+        def update_or_create_column(column_meta: Dict[str, Any]) -> Column:
+            column_name: str = column_meta["name"]
+            if column_name in existing_columns:
+                column = existing_columns[column_name]
+            else:
+                column = Column(name=column_name)
+            column.type = column_meta["type"]
+            column.is_temporal = column_meta["is_dttm"]
+            column.expression = quote_identifier(column_name)
+            column.is_aggregation = False
+            column.is_physical = True
+            column.is_spatial = False
+            column.is_partition = False  # TODO: update with accurate is_partition
+            return column
+
+        self.columns = list(map(update_or_create_column, column_metadata))
+
+    @staticmethod
+    def load_or_create(
+        database: Database,
+        table_names: Iterable[TableName],
+        default_schema: Optional[str] = None,
+        sync_columns: Optional[bool] = False,
+        default_props: Optional[Dict[str, Any]] = None,
+    ) -> List["Table"]:
+        """
+        Load or create new table model instances.
+        """
+        if not table_names:
+            return []
+
+        if not database.id:
+            raise Exception("Database must be already committed")

Review Comment:
   You get an `id` by calling `session.flush()` as well, I think that's still fine, right?
   
   ```suggestion
               raise Exception("Database must be already flushed or committed")
   ```



##########
superset/tables/models.py:
##########
@@ -80,13 +107,96 @@ class Table(Model, AuditMixinNullable, ExtraJSONMixin, ImportExportMixin):
     schema = sa.Column(sa.Text)
     name = sa.Column(sa.Text)
 
-    # The relationship between tables and columns is 1:n, but we use a many-to-many
-    # association to differentiate between the relationship between datasets and
-    # columns.
-    columns: List[Column] = relationship(
-        "Column", secondary=association_table, cascade="all, delete"
-    )
-
     # Column is managed externally and should be read-only inside Superset
     is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
     external_url = sa.Column(sa.Text, nullable=True)
+
+    @property
+    def fullname(self) -> str:
+        return str(TableName(table=self.name, schema=self.schema, catalog=self.catalog))
+
+    def __repr__(self) -> str:
+        return f"<Table id={self.id} database_id={self.database_id} {self.fullname}>"
+
+    def sync_columns(self) -> None:
+        """Sync table columns with the database. Keep metadata for existing columns"""
+        try:
+            column_metadata = get_physical_table_metadata(
+                self.database, self.name, self.schema
+            )
+        except Exception:  # pylint: disable=broad-except
+            column_metadata = []
+
+        existing_columns = {column.name: column for column in self.columns}
+        quote_identifier = self.database.quote_identifier
+
+        def update_or_create_column(column_meta: Dict[str, Any]) -> Column:
+            column_name: str = column_meta["name"]
+            if column_name in existing_columns:
+                column = existing_columns[column_name]
+            else:
+                column = Column(name=column_name)
+            column.type = column_meta["type"]
+            column.is_temporal = column_meta["is_dttm"]
+            column.expression = quote_identifier(column_name)
+            column.is_aggregation = False
+            column.is_physical = True
+            column.is_spatial = False
+            column.is_partition = False  # TODO: update with accurate is_partition
+            return column
+
+        self.columns = list(map(update_or_create_column, column_metadata))

Review Comment:
   Smallest nit:
   ```suggestion
           self.columns = [update_or_create_column(column) for column in  column_metadata)]
   ```



##########
superset/connectors/sqla/models.py:
##########
@@ -1895,105 +2030,88 @@ def before_update(
         ):
             raise Exception(get_dataset_exist_error_msg(target.full_name))
 
+    def get_sl_columns(self) -> List[NewColumn]:
+        """
+        Convert `SqlaTable.columns` and `SqlaTable.metrics` to the new Column model
+        """
+        session: Session = inspect(self).session
+
+        uuids = set()
+        for column_or_metric in self.columns + self.metrics:
+            # pre-assign uuid after new columns or metrics are inserted so
+            # the related `NewColumn` can have a deterministic uuid, too
+            if not column_or_metric.uuid:
+                column_or_metric.uuid = uuid4()
+            else:
+                uuids.add(column_or_metric.uuid)
+
+        # load existing columns from cached session states first
+        existing_columns = set(
+            find_cached_objects_in_session(session, NewColumn, uuids=uuids)
+        )
+        for column in existing_columns:
+            uuids.remove(column.uuid)
+
+        if uuids:
+            # load those not found from db
+            existing_columns |= set(
+                session.query(NewColumn).filter(NewColumn.uuid.in_(uuids)).all()
+            )
+
+        known_columns = {column.uuid: column for column in existing_columns}
+        return [
+            item.to_sl_column(known_columns) for item in self.columns + self.metrics
+        ]
+
     @staticmethod
     def update_table(  # pylint: disable=unused-argument
         mapper: Mapper, connection: Connection, target: Union[SqlMetric, TableColumn]
     ) -> None:
         """
-        Forces an update to the table's changed_on value when a metric or column on the
-        table is updated. This busts the cache key for all charts that use the table.
-
         :param mapper: Unused.
         :param connection: Unused.
         :param target: The metric or column that was updated.
         """
         inspector = inspect(target)
         session = inspector.session
 
-        # get DB-specific conditional quoter for expressions that point to columns or
-        # table names
-        database = (
-            target.table.database
-            or session.query(Database).filter_by(id=target.database_id).one()
-        )
-        engine = database.get_sqla_engine(schema=target.table.schema)
-        conditional_quote = engine.dialect.identifier_preparer.quote
-
+        # Forces an update to the table's changed_on value when a metric or column on the
+        # table is updated. This busts the cache key for all charts that use the table.
         session.execute(update(SqlaTable).where(SqlaTable.id == target.table.id))
 
-        dataset = (
-            session.query(NewDataset)
-            .filter_by(sqlatable_id=target.table.id)
-            .one_or_none()
-        )
-
-        if not dataset:
-            # if dataset is not found create a new copy
-            # of the dataset instead of updating the existing
-
-            SqlaTable.write_shadow_dataset(target.table, database, session)
-            return
-
-        # update ``Column`` model as well
-        if isinstance(target, TableColumn):
-            columns = [
-                column
-                for column in dataset.columns
-                if column.name == target.column_name
-            ]
-            if not columns:
-                return
-
-            column = columns[0]
-            extra_json = json.loads(target.extra or "{}")
-            for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}:
-                value = getattr(target, attr)
-                if value:
-                    extra_json[attr] = value
-
-            column.name = target.column_name
-            column.type = target.type or "Unknown"
-            column.expression = target.expression or conditional_quote(
-                target.column_name
+        # if table itself has changed, shadow-writing will happen in `after_udpate` anyway
+        if target.table not in session.dirty:
+            dataset: NewDataset = (
+                session.query(NewDataset)
+                .filter_by(uuid=target.table.uuid)
+                .one_or_none()
             )
-            column.description = target.description
-            column.is_temporal = target.is_dttm
-            column.is_physical = target.expression is None
-            column.extra_json = json.dumps(extra_json) if extra_json else None
-
-        else:  # SqlMetric
-            columns = [
-                column
-                for column in dataset.columns
-                if column.name == target.metric_name
-            ]
-            if not columns:
+            # Update shadow dataset and columns
+            # did we find the dataset?
+            if not dataset:
+                # if dataset is not found create a new copy
+                target.table.write_shadow_dataset()
                 return
 
-            column = columns[0]
-            extra_json = json.loads(target.extra or "{}")
-            for attr in {"verbose_name", "metric_type", "d3format"}:
-                value = getattr(target, attr)
-                if value:
-                    extra_json[attr] = value
-
-            is_additive = (
-                target.metric_type
-                and target.metric_type.lower() in ADDITIVE_METRIC_TYPES
+            # update changed_on timestamp
+            session.execute(update(NewDataset).where(NewDataset.id == dataset.id))
+
+            # update `Column` model as well
+            session.add(
+                target.to_sl_column(
+                    {
+                        target.uuid: session.query(NewColumn)
+                        .filter_by(uuid=target.uuid)
+                        .one_or_none()
+                    }
+                )
             )
 
-            column.name = target.metric_name
-            column.expression = target.expression
-            column.warning_text = target.warning_text
-            column.description = target.description
-            column.is_additive = is_additive
-            column.extra_json = json.dumps(extra_json) if extra_json else None
-
     @staticmethod
     def after_insert(
         mapper: Mapper,
         connection: Connection,
-        target: "SqlaTable",
+        table: "SqlaTable",

Review Comment:
   Nit: `sqla_table` for consistency with the next method (`after_update`).



##########
superset/migrations/versions/a9422eeaae74_new_dataset_models_take_2.py:
##########
@@ -0,0 +1,858 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""new_dataset_models_take_2
+
+Revision ID: a9422eeaae74
+Revises: cecc6bf46990
+Create Date: 2022-04-01 14:38:09.499483
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = "a9422eeaae74"
+down_revision = "cecc6bf46990"
+
+import json
+import os
+from datetime import datetime
+from typing import List, Optional, Set, Type, Union
+from uuid import uuid4
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import select
+from sqlalchemy.ext.declarative import declarative_base, declared_attr
+from sqlalchemy.orm import backref, relationship, Session
+from sqlalchemy.schema import UniqueConstraint
+from sqlalchemy.sql import functions as func
+from sqlalchemy.sql.expression import and_, or_
+from sqlalchemy_utils import UUIDType
+
+from superset import app, db
+from superset.connectors.sqla.models import ADDITIVE_METRIC_TYPES_LOWER
+from superset.connectors.sqla.utils import get_dialect_name, get_identifier_quoter
+from superset.extensions import encrypted_field_factory
+from superset.migrations.shared.utils import assign_uuids
+from superset.sql_parse import extract_table_references, Table
+from superset.utils.core import MediumText
+
+Base = declarative_base()
+custom_password_store = app.config["SQLALCHEMY_CUSTOM_PASSWORD_STORE"]
+DB_CONNECTION_MUTATOR = app.config["DB_CONNECTION_MUTATOR"]
+SHOW_PROGRESS = os.environ.get("SHOW_PROGRESS") == "1"
+
+TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"}
+
+
+user_table = sa.Table(
+    "ab_user", Base.metadata, sa.Column("id", sa.Integer(), primary_key=True)
+)
+
+
+class UUIDMixin:
+    uuid = sa.Column(
+        UUIDType(binary=True), primary_key=False, unique=True, default=uuid4
+    )
+
+
+class AuxiliaryColumnsMixin(UUIDMixin):
+    """
+    Auxiliary columns, a combination of columns added by
+       AuditMixinNullable + ImportExportMixin
+    """
+
+    created_on = sa.Column(sa.DateTime, default=datetime.now, nullable=True)
+    changed_on = sa.Column(
+        sa.DateTime, default=datetime.now, onupdate=datetime.now, nullable=True
+    )
+
+    @declared_attr
+    def created_by_fk(cls):
+        return sa.Column(sa.Integer, sa.ForeignKey("ab_user.id"), nullable=True)
+
+    @declared_attr
+    def changed_by_fk(cls):
+        return sa.Column(sa.Integer, sa.ForeignKey("ab_user.id"), nullable=True)
+
+
+def insert_from_select(
+    target: Union[str, sa.Table, Type[Base]], source: sa.sql.expression.Select
+) -> None:
+    """
+    Execute INSERT FROM SELECT to copy data from a SELECT query to the target table.
+    """
+    if isinstance(target, sa.Table):
+        target_table = target
+    elif hasattr(target, "__tablename__"):
+        target_table: sa.Table = Base.metadata.tables[target.__tablename__]
+    else:
+        target_table: sa.Table = Base.metadata.tables[target]
+    cols = [col.name for col in source.columns if col.name in target_table.columns]
+    query = target_table.insert().from_select(cols, source)
+    return op.execute(query)
+
+
+class Database(Base):
+
+    __tablename__ = "dbs"
+    __table_args__ = (UniqueConstraint("database_name"),)
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    database_name = sa.Column(sa.String(250), unique=True, nullable=False)
+    sqlalchemy_uri = sa.Column(sa.String(1024), nullable=False)
+    password = sa.Column(encrypted_field_factory.create(sa.String(1024)))
+    impersonate_user = sa.Column(sa.Boolean, default=False)
+    encrypted_extra = sa.Column(encrypted_field_factory.create(sa.Text), nullable=True)
+    extra = sa.Column(sa.Text)
+    server_cert = sa.Column(encrypted_field_factory.create(sa.Text), nullable=True)
+
+
+class TableColumn(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "table_columns"
+    __table_args__ = (UniqueConstraint("table_id", "column_name"),)
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    table_id = sa.Column(sa.Integer, sa.ForeignKey("tables.id"))
+    is_active = sa.Column(sa.Boolean, default=True)
+    extra = sa.Column(sa.Text)
+    column_name = sa.Column(sa.String(255), nullable=False)
+    type = sa.Column(sa.String(32))
+    expression = sa.Column(MediumText())
+    description = sa.Column(MediumText())
+    is_dttm = sa.Column(sa.Boolean, default=False)
+    filterable = sa.Column(sa.Boolean, default=True)
+    groupby = sa.Column(sa.Boolean, default=True)
+    verbose_name = sa.Column(sa.String(1024))
+    python_date_format = sa.Column(sa.String(255))
+
+
+class SqlMetric(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "sql_metrics"
+    __table_args__ = (UniqueConstraint("table_id", "metric_name"),)
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    table_id = sa.Column(sa.Integer, sa.ForeignKey("tables.id"))
+    extra = sa.Column(sa.Text)
+    metric_type = sa.Column(sa.String(32))
+    metric_name = sa.Column(sa.String(255), nullable=False)
+    expression = sa.Column(MediumText(), nullable=False)
+    warning_text = sa.Column(MediumText())
+    description = sa.Column(MediumText())
+    d3format = sa.Column(sa.String(128))
+    verbose_name = sa.Column(sa.String(1024))
+
+
+sqlatable_user_table = sa.Table(
+    "sqlatable_user",
+    Base.metadata,
+    sa.Column("id", sa.Integer, primary_key=True),
+    sa.Column("user_id", sa.Integer, sa.ForeignKey("ab_user.id")),
+    sa.Column("table_id", sa.Integer, sa.ForeignKey("tables.id")),
+)
+
+
+class SqlaTable(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "tables"
+    __table_args__ = (UniqueConstraint("database_id", "schema", "table_name"),)
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    extra = sa.Column(sa.Text)
+    database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False)
+    database: Database = relationship(
+        "Database",
+        backref=backref("tables", cascade="all, delete-orphan"),
+        foreign_keys=[database_id],
+    )
+    schema = sa.Column(sa.String(255))
+    table_name = sa.Column(sa.String(250), nullable=False)
+    sql = sa.Column(MediumText())
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
+    external_url = sa.Column(sa.Text, nullable=True)
+
+
+table_column_association_table = sa.Table(
+    "sl_table_columns",
+    Base.metadata,
+    sa.Column("table_id", sa.ForeignKey("sl_tables.id"), primary_key=True),
+    sa.Column("column_id", sa.ForeignKey("sl_columns.id"), primary_key=True),
+)
+
+dataset_column_association_table = sa.Table(
+    "sl_dataset_columns",
+    Base.metadata,
+    sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id"), primary_key=True),
+    sa.Column("column_id", sa.ForeignKey("sl_columns.id"), primary_key=True),
+)
+
+dataset_table_association_table = sa.Table(
+    "sl_dataset_tables",
+    Base.metadata,
+    sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id"), primary_key=True),
+    sa.Column("table_id", sa.ForeignKey("sl_tables.id"), primary_key=True),
+)
+
+dataset_user_association_table = sa.Table(
+    "sl_dataset_users",
+    Base.metadata,
+    sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id"), primary_key=True),
+    sa.Column("user_id", sa.ForeignKey("ab_user.id"), primary_key=True),
+)
+
+
+class NewColumn(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "sl_columns"
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    is_aggregation = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_additive = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_dimensional = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_filterable = sa.Column(sa.Boolean, nullable=False, default=True)
+    is_increase_desired = sa.Column(sa.Boolean, nullable=False, default=True)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_partition = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_physical = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_temporal = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_spatial = sa.Column(sa.Boolean, nullable=False, default=False)
+
+    name = sa.Column(sa.Text)
+    type = sa.Column(sa.Text)
+    unit = sa.Column(sa.Text)
+    expression = sa.Column(MediumText())
+    description = sa.Column(MediumText())
+    warning_text = sa.Column(MediumText())
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(MediumText(), default="{}")
+
+
+class NewTable(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "sl_tables"
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    # A temporary column to keep the link between NewTable to SqlaTable
+    sqlatable_id = sa.Column(sa.Integer, primary_key=False, nullable=True, unique=True)
+    database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
+    catalog = sa.Column(sa.Text)
+    schema = sa.Column(sa.Text)
+    name = sa.Column(sa.Text)
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(MediumText(), default="{}")
+    database: Database = relationship(
+        "Database",
+        backref=backref("new_tables", cascade="all, delete-orphan"),
+        foreign_keys=[database_id],
+    )
+    columns: List[NewColumn] = relationship(
+        "NewColumn", secondary=table_column_association_table, cascade="all, delete"
+    )
+
+
+class NewDataset(Base, AuxiliaryColumnsMixin):
+
+    __tablename__ = "sl_datasets"
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False)
+    is_physical = sa.Column(sa.Boolean, default=False)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
+    name = sa.Column(sa.Text)
+    expression = sa.Column(MediumText())
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(MediumText(), default="{}")
+    tables: List[NewTable] = relationship(
+        "NewTable", secondary=dataset_table_association_table
+    )
+    columns: List[NewColumn] = relationship(
+        "NewColumn", secondary=dataset_column_association_table, cascade="all, delete"
+    )
+
+
+def find_tables(
+    session: Session,
+    database_id: int,
+    default_schema: Optional[str],
+    tables: Set[Table],
+) -> List[int]:
+    """
+    Look for NewTable's of from a specific database
+    """
+    if not tables:
+        return []
+
+    predicate = or_(
+        *[
+            and_(
+                NewTable.database_id == database_id,
+                NewTable.schema == (table.schema or default_schema),
+                NewTable.name == table.table,
+            )
+            for table in tables
+        ]
+    )
+    return session.query(NewTable.id).filter(predicate).all()
+
+
+# helper SQLA elements for easier querying
+is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "")
+
+# filtering out table columns with valid associated SqlTable
+active_table_columns = sa.join(
+    TableColumn,
+    SqlaTable,
+    TableColumn.table_id == SqlaTable.id,
+)
+active_metrics = sa.join(SqlMetric, SqlaTable, SqlMetric.table_id == SqlaTable.id)
+
+
+def copy_tables(session: Session) -> None:
+    """Copy Physical tables"""
+    count = session.query(SqlaTable).filter(is_physical_table).count()
+    if not count:
+        return
+    print(f">> Copy {count:,} physical tables to sl_tables...")
+    insert_from_select(
+        NewTable,
+        select(
+            [
+                # Tables need different uuid for datasets, since they are different
+                # entities. When INSERT FROM SELECT, we must provide a value for `uuid`,
+                # otherwise it'd use the default generated on Python side, which
+                # will cause duplicate values.
+                SqlaTable.uuid,
+                SqlaTable.id.label("sqlatable_id"),
+                SqlaTable.created_on,
+                SqlaTable.changed_on,
+                SqlaTable.created_by_fk,
+                SqlaTable.changed_by_fk,
+                SqlaTable.table_name.label("name"),
+                SqlaTable.schema,
+                SqlaTable.database_id,
+                SqlaTable.is_managed_externally,
+                SqlaTable.external_url,
+            ]
+        )
+        # use an inner join to filter out only tables with valid database ids
+        .select_from(
+            sa.join(SqlaTable, Database, SqlaTable.database_id == Database.id)
+        ).where(is_physical_table),
+    )
+
+    # Assign new uuids to tables, so they are different than datasets'
+    assign_uuids(NewTable, session)
+
+
+def copy_datasets(session: Session) -> None:
+    """Copy all datasets"""
+    count = session.query(SqlaTable).count()
+    if not count:
+        return
+    print(f">> Copy {count:,} SqlaTable to sl_datasets...")
+    insert_from_select(
+        NewDataset,
+        select(
+            [
+                SqlaTable.uuid,
+                SqlaTable.created_on,
+                SqlaTable.changed_on,
+                SqlaTable.created_by_fk,
+                SqlaTable.changed_by_fk,
+                SqlaTable.database_id,
+                SqlaTable.table_name.label("name"),
+                func.coalesce(SqlaTable.sql, SqlaTable.table_name).label("expression"),
+                is_physical_table.label("is_physical"),
+                SqlaTable.is_managed_externally,
+                SqlaTable.external_url,
+                SqlaTable.extra.label("extra_json"),
+            ]
+        ),
+    )
+
+    print("   Copy dataset owners...")
+    insert_from_select(
+        dataset_user_association_table,
+        select(
+            [NewDataset.id.label("dataset_id"), sqlatable_user_table.c.user_id]
+        ).select_from(
+            sqlatable_user_table.join(
+                SqlaTable, SqlaTable.id == sqlatable_user_table.c.table_id
+            ).join(NewDataset, NewDataset.uuid == SqlaTable.uuid)
+        ),
+    )
+
+    print("   Link physical datasets with tables...")
+    # Physical datasets (tables) have the same dataset.id and table.id
+    # as both are from SqlaTable.id
+    insert_from_select(
+        dataset_table_association_table,
+        select(
+            [
+                NewDataset.id.label("dataset_id"),
+                NewTable.id.label("table_id"),
+            ]
+        ).select_from(
+            sa.join(SqlaTable, NewTable, NewTable.sqlatable_id == SqlaTable.id).join(
+                NewDataset, NewDataset.uuid == SqlaTable.uuid
+            )
+        ),
+    )
+
+
+def copy_columns(session: Session) -> None:
+    """Copy columns with active associated SqlTable"""
+    count = session.query(TableColumn).select_from(active_table_columns).count()
+    if not count:
+        return
+    print(f">> Copy {count:,} active table columns to sl_columns...")
+    insert_from_select(
+        NewColumn,
+        select(
+            [
+                TableColumn.uuid,
+                TableColumn.created_on,
+                TableColumn.changed_on,
+                TableColumn.created_by_fk,
+                TableColumn.changed_by_fk,
+                TableColumn.groupby.label("is_dimensional"),
+                TableColumn.filterable.label("is_filterable"),
+                TableColumn.column_name.label("name"),
+                TableColumn.description,
+                func.coalesce(TableColumn.expression, TableColumn.column_name).label(
+                    "expression"
+                ),
+                sa.literal(False).label("is_aggregation"),
+                or_(
+                    TableColumn.expression.is_(None), (TableColumn.expression == "")
+                ).label("is_physical"),
+                TableColumn.is_dttm.label("is_temporal"),
+                func.coalesce(TableColumn.type, "UNKNOWN").label("type"),
+                TableColumn.extra.label("extra_json"),
+            ]
+        ).select_from(active_table_columns),
+    )
+
+    print("   Link physical table columns to sl_tables...")
+    joined_columns_table = active_table_columns.join(
+        NewColumn, TableColumn.uuid == NewColumn.uuid
+    )
+    insert_from_select(
+        table_column_association_table,
+        select(
+            [
+                NewTable.id.label("table_id"),
+                NewColumn.id.label("column_id"),
+            ]
+        ).select_from(
+            joined_columns_table.join(
+                NewTable, TableColumn.table_id == NewTable.sqlatable_id
+            )
+        ),
+    )
+
+    print("   Link all columns to sl_datasets...")
+    insert_from_select(
+        dataset_column_association_table,
+        select(
+            [
+                NewDataset.id.label("dataset_id"),
+                NewColumn.id.label("column_id"),
+            ],
+        ).select_from(
+            joined_columns_table.join(NewDataset, NewDataset.uuid == SqlaTable.uuid)
+        ),
+    )
+
+
+def copy_metrics(session: Session) -> None:
+    """Copy metrics as virtual columns"""
+    metrics_count = session.query(SqlMetric).select_from(active_metrics).count()
+    if not metrics_count:
+        return
+
+    print(f">> Copy {metrics_count:,} metrics to sl_columns...")
+    insert_from_select(
+        NewColumn,
+        select(
+            [
+                SqlMetric.uuid,
+                SqlMetric.created_on,
+                SqlMetric.changed_on,
+                SqlMetric.created_by_fk,
+                SqlMetric.changed_by_fk,
+                SqlMetric.metric_name.label("name"),
+                SqlMetric.expression,
+                SqlMetric.description,
+                sa.literal("UNKNOWN").label("type"),
+                (
+                    func.coalesce(
+                        sa.func.lower(SqlMetric.metric_type).in_(
+                            ADDITIVE_METRIC_TYPES_LOWER
+                        ),
+                        sa.literal(False),
+                    ).label("is_additive")
+                ),
+                sa.literal(True).label("is_aggregation"),
+                # metrics are by default not filterable
+                sa.literal(False).label("is_filterable"),
+                sa.literal(False).label("is_dimensional"),
+                sa.literal(False).label("is_physical"),
+                sa.literal(False).label("is_temporal"),
+                SqlMetric.extra.label("extra_json"),
+                SqlMetric.warning_text,
+            ]
+        ).select_from(active_metrics),
+    )
+
+    print("   Link metric columns to datasets...")
+    insert_from_select(
+        dataset_column_association_table,
+        select(
+            [
+                NewDataset.id.label("dataset_id"),
+                NewColumn.id.label("column_id"),
+            ],
+        ).select_from(
+            active_metrics.join(NewDataset, NewDataset.uuid == SqlaTable.uuid).join(
+                NewColumn, NewColumn.uuid == SqlMetric.uuid
+            )
+        ),
+    )
+
+
+def postprocess_datasets(session: Session) -> None:
+    """
+    Postprocess datasets after insertion to
+      - Quote table names for physical datasets (if needed)
+      - Link referenced tables to virtual datasets
+    """
+    total = session.query(SqlaTable).count()
+    if not total:
+        return
+
+    offset = 0
+    limit = 10000
+
+    joined_tables = sa.join(
+        NewDataset,
+        SqlaTable,
+        NewDataset.uuid == SqlaTable.uuid,
+    ).join(
+        Database,
+        Database.id == SqlaTable.database_id,
+        isouter=True,
+    )
+    assert session.query(func.count()).select_from(joined_tables).scalar() == total
+
+    print(f">> Run postprocessing on {total} datasets")
+
+    update_count = 0
+
+    def print_update_count():
+        if SHOW_PROGRESS:
+            print(
+                f"   Will update {update_count} datasets" + " " * 20,
+                end="\r",
+            )
+
+    while offset < total:
+        print(
+            f"   Process dataset {offset + 1}~{min(total, offset + limit)}..."
+            + " " * 30
+        )
+        for (
+            database_id,
+            dataset_id,
+            expression,
+            extra,
+            is_physical,
+            schema,
+            sqlalchemy_uri,
+        ) in session.execute(
+            select(
+                [
+                    NewDataset.database_id,
+                    NewDataset.id.label("dataset_id"),
+                    NewDataset.expression,
+                    SqlaTable.extra,
+                    NewDataset.is_physical,
+                    SqlaTable.schema,
+                    Database.sqlalchemy_uri,
+                ]
+            )
+            .select_from(joined_tables)
+            .offset(offset)
+            .limit(limit)
+        ):
+            drivername = (sqlalchemy_uri or "").split("://")[0]
+            updates = {}
+            updated = False
+            if is_physical and drivername:
+                quoted_expression = get_identifier_quoter(drivername)(expression)
+                if quoted_expression != expression:
+                    updates["expression"] = quoted_expression
+
+            # add schema name to `extra_json`
+            if schema:
+                try:
+                    extra_json = json.loads(extra) if extra else {}
+                except json.decoder.JSONDecodeError:
+                    extra_json = {}
+                extra_json["schema"] = schema
+                updates["extra_json"] = json.dumps(extra_json)
+
+            if updates:
+                session.execute(
+                    sa.update(NewDataset)
+                    .where(NewDataset.id == dataset_id)
+                    .values(**updates)
+                )
+                updated = True
+
+            if not is_physical and expression:
+                table_refrences = extract_table_references(
+                    expression, get_dialect_name(drivername), show_warning=False
+                )
+                found_tables = find_tables(
+                    session, database_id, schema, table_refrences
+                )
+                if found_tables:
+                    op.bulk_insert(
+                        dataset_table_association_table,
+                        [
+                            {"dataset_id": dataset_id, "table_id": table.id}
+                            for table in found_tables
+                        ],
+                    )
+                    updated = True
+
+            if updated:
+                update_count += 1
+                print_update_count()
+
+        session.flush()
+        offset += limit
+
+    if SHOW_PROGRESS:
+        print("")
+
+
+def postprocess_columns(session: Session) -> None:
+    """
+    At this step, we will
+      - Add engine specific quotes to `expression` of physical columns
+      - Tuck some extra metadata to `extra_json`
+    """
+    total = session.query(NewColumn).count()
+    if not total:
+        return
+
+    def get_joined_tables(offset, limit):
+        return (
+            sa.join(
+                session.query(NewColumn)
+                .offset(offset)
+                .limit(limit)
+                .subquery("sl_columns"),
+                TableColumn,
+                TableColumn.uuid == NewColumn.uuid,
+                isouter=True,
+            )
+            .join(
+                SqlMetric,
+                SqlMetric.uuid == NewColumn.uuid,
+                isouter=True,
+            )
+            .join(
+                SqlaTable,
+                SqlaTable.id == func.coalesce(TableColumn.table_id, SqlMetric.table_id),
+                isouter=True,
+            )
+            .join(Database, Database.id == SqlaTable.database_id, isouter=True)
+        )
+
+    offset = 0
+    limit = 100000
+
+    print(f">> Run postprocessing on {total:,} columns")
+
+    update_count = 0
+
+    def print_update_count():
+        if SHOW_PROGRESS:
+            print(
+                f"   Will update {update_count} columns" + " " * 20,
+                end="\r",
+            )
+
+    while offset < total:
+        query = (
+            select(
+                # sorted alphabetically
+                [
+                    NewColumn.id.label("column_id"),
+                    TableColumn.column_name,
+                    SqlMetric.d3format,
+                    SqlaTable.external_url,
+                    NewColumn.extra_json,
+                    SqlaTable.is_managed_externally,
+                    NewColumn.is_physical,
+                    SqlMetric.metric_type,
+                    TableColumn.python_date_format,
+                    Database.sqlalchemy_uri,
+                    func.coalesce(
+                        TableColumn.verbose_name, SqlMetric.verbose_name
+                    ).label("verbose_name"),
+                ]
+            )
+            .select_from(get_joined_tables(offset, limit))
+            .where(
+                # pre-filter to columns with potential updates
+                or_(
+                    NewColumn.is_physical,
+                    TableColumn.verbose_name.isnot(None),
+                    TableColumn.verbose_name.isnot(None),
+                    SqlMetric.verbose_name.isnot(None),
+                    SqlMetric.d3format.isnot(None),
+                    SqlMetric.metric_type.isnot(None),
+                )
+            )
+        )
+
+        start = offset + 1
+        end = min(total, offset + limit)
+        count = session.query(func.count()).select_from(query).scalar()
+        print(f"   [Column {start:,} to {end:,}] {count:,} may be updated")
+
+        for (
+            column_id,
+            column_name,
+            d3format,
+            external_url,
+            extra,
+            is_managed_externally,
+            is_physical,
+            metric_type,
+            python_date_format,
+            sqlalchemy_uri,
+            verbose_name,
+        ) in session.execute(query):
+            try:
+                extra_json = json.loads(extra) if extra else {}
+            except json.decoder.JSONDecodeError:
+                extra_json = {}
+            updated_extra_json = {**extra_json}
+            updates = {}
+
+            # update expression for physical table columns
+            if is_physical and column_name and sqlalchemy_uri:
+                drivername = sqlalchemy_uri.split("://")[0]
+                if is_physical and drivername:
+                    quoted_expression = get_identifier_quoter(drivername)(column_name)
+                    if quoted_expression != column_name:
+                        updates["expression"] = quoted_expression
+
+            if is_managed_externally:
+                updates["is_managed_externally"] = True
+            if external_url:
+                updates["external_url"] = external_url
+
+            # update extra json
+            for (key, val) in (
+                {
+                    "verbose_name": verbose_name,
+                    "python_date_format": python_date_format,
+                    "d3format": d3format,
+                    "metric_type": metric_type,
+                }
+            ).items():
+                # save the original val, including if it's `false`
+                if val is not None:
+                    updated_extra_json[key] = val
+
+            if updated_extra_json != extra_json:
+                updates["extra_json"] = json.dumps(updated_extra_json)
+
+            if updates:
+                session.execute(
+                    sa.update(NewColumn)
+                    .where(NewColumn.id == column_id)
+                    .values(**updates)
+                )
+                update_count += 1
+                print_update_count()
+        session.flush()
+        offset += limit
+    if SHOW_PROGRESS:
+        print("")
+    print(">> Done.")
+
+
+new_tables: sa.Table = [
+    NewTable.__table__,
+    NewDataset.__table__,
+    NewColumn.__table__,
+    table_column_association_table,
+    dataset_column_association_table,
+    dataset_table_association_table,
+    dataset_user_association_table,
+]
+
+
+def reset_postgres_id_sequence(table: str) -> None:
+    op.execute(
+        f"""
+        SELECT setval(
+            pg_get_serial_sequence('{table}', 'id'),
+            COALESCE(max(id) + 1, 1),
+            false
+        )
+        FROM {table};
+    """
+    )
+
+
+def upgrade() -> None:
+    bind = op.get_bind()
+    session: Session = db.Session(bind=bind)
+    Base.metadata.drop_all(bind=bind, tables=new_tables)
+    Base.metadata.create_all(bind=bind, tables=new_tables)

Review Comment:
   Nice, I was going to ask about this.



##########
superset/migrations/versions/a9422eeaae74_new_dataset_models_take_2.py:
##########
@@ -0,0 +1,858 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""new_dataset_models_take_2
+
+Revision ID: a9422eeaae74
+Revises: cecc6bf46990
+Create Date: 2022-04-01 14:38:09.499483
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = "a9422eeaae74"
+down_revision = "cecc6bf46990"
+
+import json
+import os
+from datetime import datetime
+from typing import List, Optional, Set, Type, Union
+from uuid import uuid4
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import select
+from sqlalchemy.ext.declarative import declarative_base, declared_attr
+from sqlalchemy.orm import backref, relationship, Session
+from sqlalchemy.schema import UniqueConstraint
+from sqlalchemy.sql import functions as func
+from sqlalchemy.sql.expression import and_, or_
+from sqlalchemy_utils import UUIDType
+
+from superset import app, db
+from superset.connectors.sqla.models import ADDITIVE_METRIC_TYPES_LOWER
+from superset.connectors.sqla.utils import get_dialect_name, get_identifier_quoter
+from superset.extensions import encrypted_field_factory
+from superset.migrations.shared.utils import assign_uuids
+from superset.sql_parse import extract_table_references, Table
+from superset.utils.core import MediumText
+
+Base = declarative_base()
+custom_password_store = app.config["SQLALCHEMY_CUSTOM_PASSWORD_STORE"]
+DB_CONNECTION_MUTATOR = app.config["DB_CONNECTION_MUTATOR"]
+SHOW_PROGRESS = os.environ.get("SHOW_PROGRESS") == "1"
+
+TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"}
+
+
+user_table = sa.Table(
+    "ab_user", Base.metadata, sa.Column("id", sa.Integer(), primary_key=True)
+)
+
+
+class UUIDMixin:
+    uuid = sa.Column(
+        UUIDType(binary=True), primary_key=False, unique=True, default=uuid4
+    )
+
+
+class AuxiliaryColumnsMixin(UUIDMixin):
+    """
+    Auxiliary columns, a combination of columns added by
+       AuditMixinNullable + ImportExportMixin
+    """
+
+    created_on = sa.Column(sa.DateTime, default=datetime.now, nullable=True)
+    changed_on = sa.Column(
+        sa.DateTime, default=datetime.now, onupdate=datetime.now, nullable=True
+    )
+
+    @declared_attr
+    def created_by_fk(cls):
+        return sa.Column(sa.Integer, sa.ForeignKey("ab_user.id"), nullable=True)
+
+    @declared_attr
+    def changed_by_fk(cls):
+        return sa.Column(sa.Integer, sa.ForeignKey("ab_user.id"), nullable=True)
+
+
+def insert_from_select(
+    target: Union[str, sa.Table, Type[Base]], source: sa.sql.expression.Select
+) -> None:
+    """
+    Execute INSERT FROM SELECT to copy data from a SELECT query to the target table.
+    """
+    if isinstance(target, sa.Table):
+        target_table = target
+    elif hasattr(target, "__tablename__"):
+        target_table: sa.Table = Base.metadata.tables[target.__tablename__]
+    else:
+        target_table: sa.Table = Base.metadata.tables[target]
+    cols = [col.name for col in source.columns if col.name in target_table.columns]
+    query = target_table.insert().from_select(cols, source)
+    return op.execute(query)
+
+
+class Database(Base):
+
+    __tablename__ = "dbs"
+    __table_args__ = (UniqueConstraint("database_name"),)
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    database_name = sa.Column(sa.String(250), unique=True, nullable=False)
+    sqlalchemy_uri = sa.Column(sa.String(1024), nullable=False)
+    password = sa.Column(encrypted_field_factory.create(sa.String(1024)))
+    impersonate_user = sa.Column(sa.Boolean, default=False)
+    encrypted_extra = sa.Column(encrypted_field_factory.create(sa.Text), nullable=True)
+    extra = sa.Column(sa.Text)
+    server_cert = sa.Column(encrypted_field_factory.create(sa.Text), nullable=True)
+
+
+class TableColumn(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "table_columns"
+    __table_args__ = (UniqueConstraint("table_id", "column_name"),)
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    table_id = sa.Column(sa.Integer, sa.ForeignKey("tables.id"))
+    is_active = sa.Column(sa.Boolean, default=True)
+    extra = sa.Column(sa.Text)
+    column_name = sa.Column(sa.String(255), nullable=False)
+    type = sa.Column(sa.String(32))
+    expression = sa.Column(MediumText())
+    description = sa.Column(MediumText())
+    is_dttm = sa.Column(sa.Boolean, default=False)
+    filterable = sa.Column(sa.Boolean, default=True)
+    groupby = sa.Column(sa.Boolean, default=True)
+    verbose_name = sa.Column(sa.String(1024))
+    python_date_format = sa.Column(sa.String(255))
+
+
+class SqlMetric(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "sql_metrics"
+    __table_args__ = (UniqueConstraint("table_id", "metric_name"),)
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    table_id = sa.Column(sa.Integer, sa.ForeignKey("tables.id"))
+    extra = sa.Column(sa.Text)
+    metric_type = sa.Column(sa.String(32))
+    metric_name = sa.Column(sa.String(255), nullable=False)
+    expression = sa.Column(MediumText(), nullable=False)
+    warning_text = sa.Column(MediumText())
+    description = sa.Column(MediumText())
+    d3format = sa.Column(sa.String(128))
+    verbose_name = sa.Column(sa.String(1024))
+
+
+sqlatable_user_table = sa.Table(
+    "sqlatable_user",
+    Base.metadata,
+    sa.Column("id", sa.Integer, primary_key=True),
+    sa.Column("user_id", sa.Integer, sa.ForeignKey("ab_user.id")),
+    sa.Column("table_id", sa.Integer, sa.ForeignKey("tables.id")),
+)
+
+
+class SqlaTable(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "tables"
+    __table_args__ = (UniqueConstraint("database_id", "schema", "table_name"),)
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    extra = sa.Column(sa.Text)
+    database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False)
+    database: Database = relationship(
+        "Database",
+        backref=backref("tables", cascade="all, delete-orphan"),
+        foreign_keys=[database_id],
+    )
+    schema = sa.Column(sa.String(255))
+    table_name = sa.Column(sa.String(250), nullable=False)
+    sql = sa.Column(MediumText())
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
+    external_url = sa.Column(sa.Text, nullable=True)
+
+
+table_column_association_table = sa.Table(
+    "sl_table_columns",
+    Base.metadata,
+    sa.Column("table_id", sa.ForeignKey("sl_tables.id"), primary_key=True),
+    sa.Column("column_id", sa.ForeignKey("sl_columns.id"), primary_key=True),
+)
+
+dataset_column_association_table = sa.Table(
+    "sl_dataset_columns",
+    Base.metadata,
+    sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id"), primary_key=True),
+    sa.Column("column_id", sa.ForeignKey("sl_columns.id"), primary_key=True),
+)
+
+dataset_table_association_table = sa.Table(
+    "sl_dataset_tables",
+    Base.metadata,
+    sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id"), primary_key=True),
+    sa.Column("table_id", sa.ForeignKey("sl_tables.id"), primary_key=True),
+)
+
+dataset_user_association_table = sa.Table(
+    "sl_dataset_users",
+    Base.metadata,
+    sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id"), primary_key=True),
+    sa.Column("user_id", sa.ForeignKey("ab_user.id"), primary_key=True),
+)
+
+
+class NewColumn(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "sl_columns"
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    is_aggregation = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_additive = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_dimensional = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_filterable = sa.Column(sa.Boolean, nullable=False, default=True)
+    is_increase_desired = sa.Column(sa.Boolean, nullable=False, default=True)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_partition = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_physical = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_temporal = sa.Column(sa.Boolean, nullable=False, default=False)
+    is_spatial = sa.Column(sa.Boolean, nullable=False, default=False)
+
+    name = sa.Column(sa.Text)
+    type = sa.Column(sa.Text)
+    unit = sa.Column(sa.Text)
+    expression = sa.Column(MediumText())
+    description = sa.Column(MediumText())
+    warning_text = sa.Column(MediumText())
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(MediumText(), default="{}")
+
+
+class NewTable(AuxiliaryColumnsMixin, Base):
+
+    __tablename__ = "sl_tables"
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    # A temporary column to keep the link between NewTable to SqlaTable
+    sqlatable_id = sa.Column(sa.Integer, primary_key=False, nullable=True, unique=True)
+    database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
+    catalog = sa.Column(sa.Text)
+    schema = sa.Column(sa.Text)
+    name = sa.Column(sa.Text)
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(MediumText(), default="{}")
+    database: Database = relationship(
+        "Database",
+        backref=backref("new_tables", cascade="all, delete-orphan"),
+        foreign_keys=[database_id],
+    )
+    columns: List[NewColumn] = relationship(
+        "NewColumn", secondary=table_column_association_table, cascade="all, delete"
+    )
+
+
+class NewDataset(Base, AuxiliaryColumnsMixin):
+
+    __tablename__ = "sl_datasets"
+
+    id = sa.Column(sa.Integer, primary_key=True)
+    database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False)
+    is_physical = sa.Column(sa.Boolean, default=False)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
+    name = sa.Column(sa.Text)
+    expression = sa.Column(MediumText())
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(MediumText(), default="{}")
+    tables: List[NewTable] = relationship(
+        "NewTable", secondary=dataset_table_association_table
+    )
+    columns: List[NewColumn] = relationship(
+        "NewColumn", secondary=dataset_column_association_table, cascade="all, delete"
+    )
+
+
+def find_tables(
+    session: Session,
+    database_id: int,
+    default_schema: Optional[str],
+    tables: Set[Table],
+) -> List[int]:
+    """
+    Look for NewTable's of from a specific database
+    """
+    if not tables:
+        return []
+
+    predicate = or_(
+        *[
+            and_(
+                NewTable.database_id == database_id,
+                NewTable.schema == (table.schema or default_schema),
+                NewTable.name == table.table,
+            )
+            for table in tables
+        ]
+    )
+    return session.query(NewTable.id).filter(predicate).all()
+
+
+# helper SQLA elements for easier querying
+is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "")

Review Comment:
   Nice!



##########
superset/connectors/sqla/models.py:
##########
@@ -479,6 +541,58 @@ def data(self) -> Dict[str, Any]:
         attr_dict.update(super().data)
         return attr_dict
 
+    def to_sl_column(
+        self, known_columns: Optional[Dict[str, NewColumn]] = None
+    ) -> NewColumn:
+        """Convert a SqlMetric to NewColumn. Find and update existing or
+        create a new one."""
+        column = known_columns.get(self.uuid) if known_columns else None
+        if not column:
+            column = NewColumn()
+
+        extra_json = self.get_extra_dict()
+        for attr in {"verbose_name", "metric_type", "d3format"}:
+            value = getattr(self, attr)
+            if value is not None:
+                extra_json[attr] = value
+        is_additive = (
+            self.metric_type and self.metric_type.lower() in ADDITIVE_METRIC_TYPES_LOWER
+        )
+
+        column.uuid = self.uuid
+        column.name = self.metric_name
+        column.created_on = self.created_on
+        column.changed_on = self.changed_on
+        column.created_by = self.created_by
+        column.changed_by = self.changed_by
+        column.type = "Unknown"
+        column.expression = self.expression
+        column.warning_text = self.warning_text
+        column.description = self.description
+        column.is_aggregation = True
+        column.is_additive = is_additive
+        column.is_filterable = False
+        column.is_increase_desired = True
+        column.is_managed_externally = self.table.is_managed_externally
+        column.is_partition = False
+        column.is_physical = False
+        column.is_spatial = False
+        column.extra_json = json.dumps(extra_json) if extra_json else None
+        column.external_url = self.table.external_url
+
+        return column
+
+    @staticmethod
+    def after_delete(  # pylint: disable=unused-argument
+        mapper: Mapper,
+        connection: Connection,
+        target: "SqlMetric",
+    ) -> None:
+        session = inspect(target).session
+        dataset = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
+        if dataset:
+            session.delete(dataset)

Review Comment:
   ```suggestion
           column = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
           if column:
               session.delete(column)
   ```



##########
superset/connectors/sqla/models.py:
##########
@@ -2273,95 +2286,54 @@ def write_shadow_dataset(  # pylint: disable=too-many-locals
 
         For more context: https://github.com/apache/superset/issues/14909
         """
-
-        engine = database.get_sqla_engine(schema=dataset.schema)
-        conditional_quote = engine.dialect.identifier_preparer.quote
+        session = inspect(self).session
+        if self.database_id and (
+            not self.database or self.database.id != self.database_id
+        ):
+            self.database = session.query(Database).filter_by(id=self.database_id).one()
 
         # create columns
         columns = []
-        for column in dataset.columns:
-            # ``is_active`` might be ``None`` at this point, but it defaults to ``True``.
-            if column.is_active is False:
-                continue
-
-            try:
-                extra_json = json.loads(column.extra or "{}")
-            except json.decoder.JSONDecodeError:
-                extra_json = {}
-            for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}:
-                value = getattr(column, attr)
-                if value:
-                    extra_json[attr] = value
-
-            columns.append(
-                NewColumn(
-                    name=column.column_name,
-                    type=column.type or "Unknown",
-                    expression=column.expression
-                    or conditional_quote(column.column_name),
-                    description=column.description,
-                    is_temporal=column.is_dttm,
-                    is_aggregation=False,
-                    is_physical=column.expression is None,
-                    is_spatial=False,
-                    is_partition=False,
-                    is_increase_desired=True,
-                    extra_json=json.dumps(extra_json) if extra_json else None,
-                    is_managed_externally=dataset.is_managed_externally,
-                    external_url=dataset.external_url,
-                ),
-            )
-
-        # create metrics
-        for metric in dataset.metrics:
-            try:
-                extra_json = json.loads(metric.extra or "{}")
-            except json.decoder.JSONDecodeError:
-                extra_json = {}
-            for attr in {"verbose_name", "metric_type", "d3format"}:
-                value = getattr(metric, attr)
-                if value:
-                    extra_json[attr] = value
-
-            is_additive = (
-                metric.metric_type
-                and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES
-            )
-
-            columns.append(
-                NewColumn(
-                    name=metric.metric_name,
-                    type="Unknown",  # figuring this out would require a type inferrer
-                    expression=metric.expression,
-                    warning_text=metric.warning_text,
-                    description=metric.description,
-                    is_aggregation=True,
-                    is_additive=is_additive,
-                    is_physical=False,
-                    is_spatial=False,
-                    is_partition=False,
-                    is_increase_desired=True,
-                    extra_json=json.dumps(extra_json) if extra_json else None,
-                    is_managed_externally=dataset.is_managed_externally,
-                    external_url=dataset.external_url,
-                ),
-            )
+        for item in self.columns + self.metrics:
+            item.created_by = self.created_by
+            item.changed_by = self.changed_by
+            # on `SqlaTable.after_insert`` event, although the table itself
+            # already has a `uuid`, the associated columns will not.
+            # Here we pre-assign a uuid so they can still be matched to the new
+            # Column after creation.
+            if not item.uuid:
+                item.uuid = uuid4()
+            columns.append(item.to_sl_column())
 
         # physical dataset
-        if not dataset.sql:
-            physical_columns = [column for column in columns if column.is_physical]
-
-            # create table
-            table = NewTable(
-                name=dataset.table_name,
-                schema=dataset.schema,
-                catalog=None,  # currently not supported
-                database_id=dataset.database_id,
-                columns=physical_columns,
-                is_managed_externally=dataset.is_managed_externally,
-                external_url=dataset.external_url,
+        if not self.sql:
+            # always create separate column entities for Dataset and Table
+            # so updating a dataset would not update the columns in the table
+            physical_columns = [
+                clone_model(
+                    column,
+                    ignore=["uuid"],
+                    # `created_by` will always be left empty because it'd always
+                    # be created via some sort of automated system.
+                    # But keep `changed_by` in case someone manually changes
+                    # column attributes such as `is_dttm`.
+                    additional=["changed_by"],
+                )
+                for column in columns
+                if column.is_physical
+            ]
+            tables = NewTable.load_or_create(

Review Comment:
   Same here, `tables = [NewTable.load_or_create(...)]`.



##########
superset/connectors/sqla/models.py:
##########
@@ -2273,95 +2286,54 @@ def write_shadow_dataset(  # pylint: disable=too-many-locals
 
         For more context: https://github.com/apache/superset/issues/14909
         """
-
-        engine = database.get_sqla_engine(schema=dataset.schema)
-        conditional_quote = engine.dialect.identifier_preparer.quote
+        session = inspect(self).session
+        if self.database_id and (
+            not self.database or self.database.id != self.database_id
+        ):
+            self.database = session.query(Database).filter_by(id=self.database_id).one()
 
         # create columns
         columns = []
-        for column in dataset.columns:
-            # ``is_active`` might be ``None`` at this point, but it defaults to ``True``.
-            if column.is_active is False:
-                continue
-
-            try:
-                extra_json = json.loads(column.extra or "{}")
-            except json.decoder.JSONDecodeError:
-                extra_json = {}
-            for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}:
-                value = getattr(column, attr)
-                if value:
-                    extra_json[attr] = value
-
-            columns.append(
-                NewColumn(
-                    name=column.column_name,
-                    type=column.type or "Unknown",
-                    expression=column.expression
-                    or conditional_quote(column.column_name),
-                    description=column.description,
-                    is_temporal=column.is_dttm,
-                    is_aggregation=False,
-                    is_physical=column.expression is None,
-                    is_spatial=False,
-                    is_partition=False,
-                    is_increase_desired=True,
-                    extra_json=json.dumps(extra_json) if extra_json else None,
-                    is_managed_externally=dataset.is_managed_externally,
-                    external_url=dataset.external_url,
-                ),
-            )
-
-        # create metrics
-        for metric in dataset.metrics:
-            try:
-                extra_json = json.loads(metric.extra or "{}")
-            except json.decoder.JSONDecodeError:
-                extra_json = {}
-            for attr in {"verbose_name", "metric_type", "d3format"}:
-                value = getattr(metric, attr)
-                if value:
-                    extra_json[attr] = value
-
-            is_additive = (
-                metric.metric_type
-                and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES
-            )
-
-            columns.append(
-                NewColumn(
-                    name=metric.metric_name,
-                    type="Unknown",  # figuring this out would require a type inferrer
-                    expression=metric.expression,
-                    warning_text=metric.warning_text,
-                    description=metric.description,
-                    is_aggregation=True,
-                    is_additive=is_additive,
-                    is_physical=False,
-                    is_spatial=False,
-                    is_partition=False,
-                    is_increase_desired=True,
-                    extra_json=json.dumps(extra_json) if extra_json else None,
-                    is_managed_externally=dataset.is_managed_externally,
-                    external_url=dataset.external_url,
-                ),
-            )
+        for item in self.columns + self.metrics:
+            item.created_by = self.created_by
+            item.changed_by = self.changed_by
+            # on `SqlaTable.after_insert`` event, although the table itself
+            # already has a `uuid`, the associated columns will not.
+            # Here we pre-assign a uuid so they can still be matched to the new
+            # Column after creation.
+            if not item.uuid:
+                item.uuid = uuid4()
+            columns.append(item.to_sl_column())
 
         # physical dataset
-        if not dataset.sql:
-            physical_columns = [column for column in columns if column.is_physical]
-
-            # create table
-            table = NewTable(
-                name=dataset.table_name,
-                schema=dataset.schema,
-                catalog=None,  # currently not supported
-                database_id=dataset.database_id,
-                columns=physical_columns,
-                is_managed_externally=dataset.is_managed_externally,
-                external_url=dataset.external_url,
+        if not self.sql:
+            # always create separate column entities for Dataset and Table
+            # so updating a dataset would not update the columns in the table

Review Comment:
   This was my intention originally, thanks for fixing it!



##########
superset/sql_parse.py:
##########
@@ -698,3 +707,75 @@ def insert_rls(
         )
 
     return token_list
+
+
+# mapping between sqloxide and SQLAlchemy dialects
+SQLOXITE_DIALECTS = {
+    "ansi": {"trino", "trinonative", "presto"},
+    "hive": {"hive", "databricks"},
+    "ms": {"mssql"},
+    "mysql": {"mysql"},
+    "postgres": {
+        "cockroachdb",
+        "hana",
+        "netezza",
+        "postgres",
+        "postgresql",
+        "redshift",
+        "vertica",
+    },
+    "snowflake": {"snowflake"},
+    "sqlite": {"sqlite", "gsheets", "shillelagh"},
+    "clickhouse": {"clickhouse"},
+}
+
+RE_JINJA_VAR = re.compile(r"\{\{[^\{\}]+\}\}")
+RE_JINJA_BLOCK = re.compile(r"\{[%#][^\{\}%#]+[%#]\}")

Review Comment:
   Oh, good catch, I forgot about those.



##########
superset/connectors/sqla/models.py:
##########
@@ -1895,105 +2030,88 @@ def before_update(
         ):
             raise Exception(get_dataset_exist_error_msg(target.full_name))
 
+    def get_sl_columns(self) -> List[NewColumn]:
+        """
+        Convert `SqlaTable.columns` and `SqlaTable.metrics` to the new Column model
+        """
+        session: Session = inspect(self).session
+
+        uuids = set()
+        for column_or_metric in self.columns + self.metrics:
+            # pre-assign uuid after new columns or metrics are inserted so
+            # the related `NewColumn` can have a deterministic uuid, too
+            if not column_or_metric.uuid:
+                column_or_metric.uuid = uuid4()
+            else:
+                uuids.add(column_or_metric.uuid)
+
+        # load existing columns from cached session states first
+        existing_columns = set(
+            find_cached_objects_in_session(session, NewColumn, uuids=uuids)
+        )
+        for column in existing_columns:
+            uuids.remove(column.uuid)
+
+        if uuids:
+            # load those not found from db
+            existing_columns |= set(
+                session.query(NewColumn).filter(NewColumn.uuid.in_(uuids)).all()

Review Comment:
   Nit: no need for `.all()` here, since `set()` will exhaust the generator AFAIK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org
For additional commands, e-mail: notifications-help@superset.apache.org