You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@superset.apache.org by GitBox <gi...@apache.org> on 2022/04/01 16:15:31 UTC

[GitHub] [superset] ktmud commented on a change in pull request #19421: perf: migrate new dataset models with INSERT FROM

ktmud commented on a change in pull request #19421:
URL: https://github.com/apache/superset/pull/19421#discussion_r840089807



##########
File path: superset/migrations/versions/b8d3a24d9131_new_dataset_models.py
##########
@@ -207,427 +244,557 @@ class NewTable(Base):
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=table_column_association_table, cascade="all, delete"
     )
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
-class NewDataset(Base):
+class NewDataset(Base, AuxiliaryColumnsMixin):
 
     __tablename__ = "sl_datasets"
 
     id = sa.Column(sa.Integer, primary_key=True)
     sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True)
     name = sa.Column(sa.Text)
-    expression = sa.Column(sa.Text)
+    expression = sa.Column(MediumText())
+    is_physical = sa.Column(sa.Boolean, default=False)
+    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
+    external_url = sa.Column(sa.Text, nullable=True)
+    extra_json = sa.Column(sa.Text, default="{}")
     tables: List[NewTable] = relationship(
         "NewTable", secondary=dataset_table_association_table
     )
     columns: List[NewColumn] = relationship(
         "NewColumn", secondary=dataset_column_association_table, cascade="all, delete"
     )
-    is_physical = sa.Column(sa.Boolean, default=False)
-    is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
-    external_url = sa.Column(sa.Text, nullable=True)
 
 
 TEMPORAL_TYPES = {"DATETIME", "DATE", "TIME", "TIMEDELTA"}
 
 
-def load_or_create_tables(
+def find_tables(
     session: Session,
     database_id: int,
     default_schema: Optional[str],
     tables: Set[Table],
-    conditional_quote: Callable[[str], str],
-) -> List[NewTable]:
+) -> List[int]:
     """
-    Load or create new table model instances.
+    Look for NewTable's of from a specific database
     """
     if not tables:
         return []
 
-    # set the default schema in tables that don't have it
-    if default_schema:
-        tables = list(tables)
-        for i, table in enumerate(tables):
-            if table.schema is None:
-                tables[i] = Table(table.table, default_schema, table.catalog)
-
-    # load existing tables
     predicate = or_(
         *[
             and_(
                 NewTable.database_id == database_id,
-                NewTable.schema == table.schema,
+                NewTable.schema == (table.schema or default_schema),
                 NewTable.name == table.table,
             )
             for table in tables
         ]
     )
-    new_tables = session.query(NewTable).filter(predicate).all()
-
-    # use original database model to get the engine
-    engine = (
-        session.query(OriginalDatabase)
-        .filter_by(id=database_id)
-        .one()
-        .get_sqla_engine(default_schema)
-    )
-    inspector = inspect(engine)
-
-    # add missing tables
-    existing = {(table.schema, table.name) for table in new_tables}
-    for table in tables:
-        if (table.schema, table.table) not in existing:
-            column_metadata = inspector.get_columns(table.table, schema=table.schema)
-            columns = [
-                NewColumn(
-                    name=column["name"],
-                    type=str(column["type"]),
-                    expression=conditional_quote(column["name"]),
-                    is_temporal=column["type"].python_type.__name__.upper()
-                    in TEMPORAL_TYPES,
-                    is_aggregation=False,
-                    is_physical=True,
-                    is_spatial=False,
-                    is_partition=False,
-                    is_increase_desired=True,
-                )
-                for column in column_metadata
-            ]
-            new_tables.append(
-                NewTable(
-                    name=table.table,
-                    schema=table.schema,
-                    catalog=None,
-                    database_id=database_id,
-                    columns=columns,
-                )
-            )
-            existing.add((table.schema, table.table))
+    return session.query(NewTable.id).filter(predicate).all()
 
-    return new_tables
 
+# helper SQLA elements for easier querying
+is_physical_table = or_(SqlaTable.sql.is_(None), SqlaTable.sql == "")
 
-def after_insert(target: SqlaTable) -> None:  # pylint: disable=too-many-locals
-    """
-    Copy old datasets to the new models.
-    """
-    session = inspect(target).session
+# filtering out columns and metrics with valid associated SqlTable
+active_table_columns = sa.join(
+    TableColumn,
+    SqlaTable,
+    and_(
+        TableColumn.table_id == SqlaTable.id,
+        TableColumn.is_active,
+    ),
+)
+active_metrics = sa.join(SqlMetric, SqlaTable, SqlMetric.table_id == SqlaTable.id)
 
-    # get DB-specific conditional quoter for expressions that point to columns or
-    # table names
-    database = (
-        target.database
-        or session.query(Database).filter_by(id=target.database_id).first()
-    )
-    if not database:
-        return
-    url = make_url(database.sqlalchemy_uri)
-    dialect_class = url.get_dialect()
-    conditional_quote = dialect_class().identifier_preparer.quote
-
-    # create columns
-    columns = []
-    for column in target.columns:
-        # ``is_active`` might be ``None`` at this point, but it defaults to ``True``.
-        if column.is_active is False:
-            continue
-
-        try:
-            extra_json = json.loads(column.extra or "{}")
-        except json.decoder.JSONDecodeError:
-            extra_json = {}
-        for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}:
-            value = getattr(column, attr)
-            if value:
-                extra_json[attr] = value
-
-        columns.append(
-            NewColumn(
-                name=column.column_name,
-                type=column.type or "Unknown",
-                expression=column.expression or conditional_quote(column.column_name),
-                description=column.description,
-                is_temporal=column.is_dttm,
-                is_aggregation=False,
-                is_physical=column.expression is None or column.expression == "",
-                is_spatial=False,
-                is_partition=False,
-                is_increase_desired=True,
-                extra_json=json.dumps(extra_json) if extra_json else None,
-                is_managed_externally=target.is_managed_externally,
-                external_url=target.external_url,
-            ),
-        )
 
-    # create metrics
-    for metric in target.metrics:
-        try:
-            extra_json = json.loads(metric.extra or "{}")
-        except json.decoder.JSONDecodeError:
-            extra_json = {}
-        for attr in {"verbose_name", "metric_type", "d3format"}:
-            value = getattr(metric, attr)
-            if value:
-                extra_json[attr] = value
-
-        is_additive = (
-            metric.metric_type and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES
+def copy_tables(session: Session) -> None:
+    """Copy Physical tables"""
+    count = session.query(SqlaTable).filter(is_physical_table).count()
+    print(f">> Copy {count:,} physical tables to `sl_tables`...")
+    insert_from_select(
+        "sl_tables",
+        select(
+            [
+                SqlaTable.id,

Review comment:
       I'm porting over the same `id` and `uuid` from the original tables so relationship mapping can be easier.
   
   Info from AuditMixin are also copied over. As the new tables are intended to fully replace the original tables, retaining these info would also be useful for end user experience (especially `changed_on` and `created_on`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org
For additional commands, e-mail: notifications-help@superset.apache.org