You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by hu...@apache.org on 2023/01/24 16:51:47 UTC
[superset] 02/02: mk1: working explore with sqlatable model with exploremixin
This is an automated email from the ASF dual-hosted git repository.
hugh pushed a commit to branch fix-explore-mixin
in repository https://gitbox.apache.org/repos/asf/superset.git
commit 4528addf1172ce34bb2510123a32fb034bf3088e
Author: hughhhh <hu...@gmail.com>
AuthorDate: Tue Jan 24 18:51:16 2023 +0200
mk1: working explore with sqlatable model with exploremixin
---
superset/connectors/sqla/models.py | 646 -------------------------------------
1 file changed, 646 deletions(-)
diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py
index bfdec8b996..350c03a793 100644
--- a/superset/connectors/sqla/models.py
+++ b/superset/connectors/sqla/models.py
@@ -138,8 +138,6 @@ from superset.utils.core import (
remove_duplicates,
)
-from superset.models.helpers import ExploreMixin
-
config = app.config
metadata = Model.metadata # pylint: disable=no-member
logger = logging.getLogger(__name__)
@@ -1143,650 +1141,6 @@ class SqlaTable(Model, BaseDatasource, ExploreMixin): # pylint: disable=too-man
def text(self, clause: str) -> TextClause:
return self.db_engine_spec.get_text_clause(clause)
- # def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements
- # self,
- # apply_fetch_values_predicate: bool = False,
- # columns: Optional[List[ColumnTyping]] = None,
- # extras: Optional[Dict[str, Any]] = None,
- # filter: Optional[ # pylint: disable=redefined-builtin
- # List[QueryObjectFilterClause]
- # ] = None,
- # from_dttm: Optional[datetime] = None,
- # granularity: Optional[str] = None,
- # groupby: Optional[List[Column]] = None,
- # inner_from_dttm: Optional[datetime] = None,
- # inner_to_dttm: Optional[datetime] = None,
- # is_rowcount: bool = False,
- # is_timeseries: bool = True,
- # metrics: Optional[List[Metric]] = None,
- # orderby: Optional[List[OrderBy]] = None,
- # order_desc: bool = True,
- # to_dttm: Optional[datetime] = None,
- # series_columns: Optional[List[Column]] = None,
- # series_limit: Optional[int] = None,
- # series_limit_metric: Optional[Metric] = None,
- # row_limit: Optional[int] = None,
- # row_offset: Optional[int] = None,
- # timeseries_limit: Optional[int] = None,
- # timeseries_limit_metric: Optional[Metric] = None,
- # time_shift: Optional[str] = None,
- # ) -> SqlaQuery:
- # """Querying any sqla table from this common interface"""
- # if granularity not in self.dttm_cols and granularity is not None:
- # granularity = self.main_dttm_col
-
- # extras = extras or {}
- # time_grain = extras.get("time_grain_sqla")
-
- # template_kwargs = {
- # "columns": columns,
- # "from_dttm": from_dttm.isoformat() if from_dttm else None,
- # "groupby": groupby,
- # "metrics": metrics,
- # "row_limit": row_limit,
- # "row_offset": row_offset,
- # "time_column": granularity,
- # "time_grain": time_grain,
- # "to_dttm": to_dttm.isoformat() if to_dttm else None,
- # "table_columns": [col.column_name for col in self.columns],
- # "filter": filter,
- # }
- # columns = columns or []
- # groupby = groupby or []
- # series_column_names = utils.get_column_names(series_columns or [])
- # # deprecated, to be removed in 2.0
- # if is_timeseries and timeseries_limit:
- # series_limit = timeseries_limit
- # series_limit_metric = series_limit_metric or timeseries_limit_metric
- # template_kwargs.update(self.template_params_dict)
- # extra_cache_keys: List[Any] = []
- # template_kwargs["extra_cache_keys"] = extra_cache_keys
- # removed_filters: List[str] = []
- # applied_template_filters: List[str] = []
- # template_kwargs["removed_filters"] = removed_filters
- # template_kwargs["applied_filters"] = applied_template_filters
- # template_processor = self.get_template_processor(**template_kwargs)
- # db_engine_spec = self.db_engine_spec
- # prequeries: List[str] = []
- # orderby = orderby or []
- # need_groupby = bool(metrics is not None or groupby)
- # metrics = metrics or []
-
- # # For backward compatibility
- # if granularity not in self.dttm_cols and granularity is not None:
- # granularity = self.main_dttm_col
-
- # columns_by_name: Dict[str, TableColumn] = {
- # col.column_name: col for col in self.columns
- # }
-
- # metrics_by_name: Dict[str, SqlMetric] = {m.metric_name: m for m in self.metrics}
-
- # if not granularity and is_timeseries:
- # raise QueryObjectValidationError(
- # _(
- # "Datetime column not provided as part table configuration "
- # "and is required by this type of chart"
- # )
- # )
- # if not metrics and not columns and not groupby:
- # raise QueryObjectValidationError(_("Empty query?"))
-
- # metrics_exprs: List[ColumnElement] = []
- # for metric in metrics:
- # if utils.is_adhoc_metric(metric):
- # assert isinstance(metric, dict)
- # metrics_exprs.append(
- # self.adhoc_metric_to_sqla(
- # metric=metric,
- # columns_by_name=columns_by_name,
- # template_processor=template_processor,
- # )
- # )
- # elif isinstance(metric, str) and metric in metrics_by_name:
- # metrics_exprs.append(
- # metrics_by_name[metric].get_sqla_col(
- # template_processor=template_processor
- # )
- # )
- # else:
- # raise QueryObjectValidationError(
- # _("Metric '%(metric)s' does not exist", metric=metric)
- # )
-
- # if metrics_exprs:
- # main_metric_expr = metrics_exprs[0]
- # else:
- # main_metric_expr, label = literal_column("COUNT(*)"), "ccount"
- # main_metric_expr = self.make_sqla_column_compatible(main_metric_expr, label)
-
- # # To ensure correct handling of the ORDER BY labeling we need to reference the
- # # metric instance if defined in the SELECT clause.
- # # use the key of the ColumnClause for the expected label
- # metrics_exprs_by_label = {m.key: m for m in metrics_exprs}
- # metrics_exprs_by_expr = {str(m): m for m in metrics_exprs}
-
- # # Since orderby may use adhoc metrics, too; we need to process them first
- # orderby_exprs: List[ColumnElement] = []
- # for orig_col, ascending in orderby:
- # col: Union[AdhocMetric, ColumnElement] = orig_col
- # if isinstance(col, dict):
- # col = cast(AdhocMetric, col)
- # if col.get("sqlExpression"):
- # col["sqlExpression"] = _process_sql_expression(
- # expression=col["sqlExpression"],
- # database_id=self.database_id,
- # schema=self.schema,
- # template_processor=template_processor,
- # )
- # if utils.is_adhoc_metric(col):
- # # add adhoc sort by column to columns_by_name if not exists
- # col = self.adhoc_metric_to_sqla(col, columns_by_name)
- # # if the adhoc metric has been defined before
- # # use the existing instance.
- # col = metrics_exprs_by_expr.get(str(col), col)
- # need_groupby = True
- # elif col in columns_by_name:
- # col = columns_by_name[col].get_sqla_col(
- # template_processor=template_processor
- # )
- # elif col in metrics_exprs_by_label:
- # col = metrics_exprs_by_label[col]
- # need_groupby = True
- # elif col in metrics_by_name:
- # col = metrics_by_name[col].get_sqla_col(
- # template_processor=template_processor
- # )
- # need_groupby = True
-
- # if isinstance(col, ColumnElement):
- # orderby_exprs.append(col)
- # else:
- # # Could not convert a column reference to valid ColumnElement
- # raise QueryObjectValidationError(
- # _("Unknown column used in orderby: %(col)s", col=orig_col)
- # )
-
- # select_exprs: List[Union[Column, Label]] = []
- # groupby_all_columns = {}
- # groupby_series_columns = {}
-
- # # filter out the pseudo column __timestamp from columns
- # columns = [col for col in columns if col != utils.DTTM_ALIAS]
- # dttm_col = columns_by_name.get(granularity) if granularity else None
-
- # if need_groupby:
- # # dedup columns while preserving order
- # columns = groupby or columns
- # for selected in columns:
- # if isinstance(selected, str):
- # # if groupby field/expr equals granularity field/expr
- # if selected == granularity:
- # table_col = columns_by_name[selected]
- # outer = table_col.get_timestamp_expression(
- # time_grain=time_grain,
- # label=selected,
- # template_processor=template_processor,
- # )
- # # if groupby field equals a selected column
- # elif selected in columns_by_name:
- # outer = columns_by_name[selected].get_sqla_col(
- # template_processor=template_processor
- # )
- # else:
- # selected = validate_adhoc_subquery(
- # selected,
- # self.database_id,
- # self.schema,
- # )
- # outer = literal_column(f"({selected})")
- # outer = self.make_sqla_column_compatible(outer, selected)
- # else:
- # outer = self.adhoc_column_to_sqla(
- # col=selected, template_processor=template_processor
- # )
- # groupby_all_columns[outer.name] = outer
- # if (
- # is_timeseries and not series_column_names
- # ) or outer.name in series_column_names:
- # groupby_series_columns[outer.name] = outer
- # select_exprs.append(outer)
- # elif columns:
- # for selected in columns:
- # if is_adhoc_column(selected):
- # _sql = selected["sqlExpression"]
- # _column_label = selected["label"]
- # elif isinstance(selected, str):
- # _sql = selected
- # _column_label = selected
-
- # selected = validate_adhoc_subquery(
- # _sql,
- # self.database_id,
- # self.schema,
- # )
- # select_exprs.append(
- # columns_by_name[selected].get_sqla_col(
- # template_processor=template_processor
- # )
- # if isinstance(selected, str) and selected in columns_by_name
- # else self.make_sqla_column_compatible(
- # literal_column(selected), _column_label
- # )
- # )
- # metrics_exprs = []
-
- # if granularity:
- # if granularity not in columns_by_name or not dttm_col:
- # raise QueryObjectValidationError(
- # _(
- # 'Time column "%(col)s" does not exist in dataset',
- # col=granularity,
- # )
- # )
- # time_filters = []
-
- # if is_timeseries:
- # timestamp = dttm_col.get_timestamp_expression(
- # time_grain=time_grain, template_processor=template_processor
- # )
- # # always put timestamp as the first column
- # select_exprs.insert(0, timestamp)
- # groupby_all_columns[timestamp.name] = timestamp
-
- # # Use main dttm column to support index with secondary dttm columns.
- # if (
- # db_engine_spec.time_secondary_columns
- # and self.main_dttm_col in self.dttm_cols
- # and self.main_dttm_col != dttm_col.column_name
- # ):
- # time_filters.append(
- # columns_by_name[self.main_dttm_col].get_time_filter(
- # start_dttm=from_dttm,
- # end_dttm=to_dttm,
- # template_processor=template_processor,
- # )
- # )
- # time_filters.append(
- # dttm_col.get_time_filter(
- # start_dttm=from_dttm,
- # end_dttm=to_dttm,
- # template_processor=template_processor,
- # )
- # )
-
- # # Always remove duplicates by column name, as sometimes `metrics_exprs`
- # # can have the same name as a groupby column (e.g. when users use
- # # raw columns as custom SQL adhoc metric).
- # select_exprs = remove_duplicates(
- # select_exprs + metrics_exprs, key=lambda x: x.name
- # )
-
- # # Expected output columns
- # labels_expected = [c.key for c in select_exprs]
-
- # # Order by columns are "hidden" columns, some databases require them
- # # always be present in SELECT if an aggregation function is used
- # if not db_engine_spec.allows_hidden_ordeby_agg:
- # select_exprs = remove_duplicates(select_exprs + orderby_exprs)
-
- # qry = sa.select(select_exprs)
-
- # tbl, cte = self.get_from_clause(template_processor)
-
- # if groupby_all_columns:
- # qry = qry.group_by(*groupby_all_columns.values())
-
- # where_clause_and = []
- # having_clause_and = []
-
- # for flt in filter: # type: ignore
- # if not all(flt.get(s) for s in ["col", "op"]):
- # continue
- # flt_col = flt["col"]
- # val = flt.get("val")
- # op = flt["op"].upper()
- # col_obj: Optional[TableColumn] = None
- # sqla_col: Optional[Column] = None
- # if flt_col == utils.DTTM_ALIAS and is_timeseries and dttm_col:
- # col_obj = dttm_col
- # elif is_adhoc_column(flt_col):
- # sqla_col = self.adhoc_column_to_sqla(flt_col)
- # else:
- # col_obj = columns_by_name.get(flt_col)
- # filter_grain = flt.get("grain")
-
- # if is_feature_enabled("ENABLE_TEMPLATE_REMOVE_FILTERS"):
- # if get_column_name(flt_col) in removed_filters:
- # # Skip generating SQLA filter when the jinja template handles it.
- # continue
-
- # if col_obj or sqla_col is not None:
- # if sqla_col is not None:
- # pass
- # elif col_obj and filter_grain:
- # sqla_col = col_obj.get_timestamp_expression(
- # time_grain=filter_grain, template_processor=template_processor
- # )
- # elif col_obj:
- # sqla_col = col_obj.get_sqla_col(
- # template_processor=template_processor
- # )
- # col_type = col_obj.type if col_obj else None
- # col_spec = db_engine_spec.get_column_spec(
- # native_type=col_type,
- # db_extra=self.database.get_extra(),
- # )
- # is_list_target = op in (
- # utils.FilterOperator.IN.value,
- # utils.FilterOperator.NOT_IN.value,
- # )
-
- # col_advanced_data_type = col_obj.advanced_data_type if col_obj else ""
-
- # if col_spec and not col_advanced_data_type:
- # target_generic_type = col_spec.generic_type
- # else:
- # target_generic_type = GenericDataType.STRING
- # eq = self.filter_values_handler(
- # values=val,
- # operator=op,
- # target_generic_type=target_generic_type,
- # target_native_type=col_type,
- # is_list_target=is_list_target,
- # db_engine_spec=db_engine_spec,
- # db_extra=self.database.get_extra(),
- # )
- # if (
- # col_advanced_data_type != ""
- # and feature_flag_manager.is_feature_enabled(
- # "ENABLE_ADVANCED_DATA_TYPES"
- # )
- # and col_advanced_data_type in ADVANCED_DATA_TYPES
- # ):
- # values = eq if is_list_target else [eq] # type: ignore
- # bus_resp: AdvancedDataTypeResponse = ADVANCED_DATA_TYPES[
- # col_advanced_data_type
- # ].translate_type(
- # {
- # "type": col_advanced_data_type,
- # "values": values,
- # }
- # )
- # if bus_resp["error_message"]:
- # raise AdvancedDataTypeResponseError(
- # _(bus_resp["error_message"])
- # )
-
- # where_clause_and.append(
- # ADVANCED_DATA_TYPES[col_advanced_data_type].translate_filter(
- # sqla_col, op, bus_resp["values"]
- # )
- # )
- # elif is_list_target:
- # assert isinstance(eq, (tuple, list))
- # if len(eq) == 0:
- # raise QueryObjectValidationError(
- # _("Filter value list cannot be empty")
- # )
- # if len(eq) > len(
- # eq_without_none := [x for x in eq if x is not None]
- # ):
- # is_null_cond = sqla_col.is_(None)
- # if eq:
- # cond = or_(is_null_cond, sqla_col.in_(eq_without_none))
- # else:
- # cond = is_null_cond
- # else:
- # cond = sqla_col.in_(eq)
- # if op == utils.FilterOperator.NOT_IN.value:
- # cond = ~cond
- # where_clause_and.append(cond)
- # elif op == utils.FilterOperator.IS_NULL.value:
- # where_clause_and.append(sqla_col.is_(None))
- # elif op == utils.FilterOperator.IS_NOT_NULL.value:
- # where_clause_and.append(sqla_col.isnot(None))
- # elif op == utils.FilterOperator.IS_TRUE.value:
- # where_clause_and.append(sqla_col.is_(True))
- # elif op == utils.FilterOperator.IS_FALSE.value:
- # where_clause_and.append(sqla_col.is_(False))
- # else:
- # if (
- # op
- # not in {
- # utils.FilterOperator.EQUALS.value,
- # utils.FilterOperator.NOT_EQUALS.value,
- # }
- # and eq is None
- # ):
- # raise QueryObjectValidationError(
- # _(
- # "Must specify a value for filters "
- # "with comparison operators"
- # )
- # )
- # if op == utils.FilterOperator.EQUALS.value:
- # where_clause_and.append(sqla_col == eq)
- # elif op == utils.FilterOperator.NOT_EQUALS.value:
- # where_clause_and.append(sqla_col != eq)
- # elif op == utils.FilterOperator.GREATER_THAN.value:
- # where_clause_and.append(sqla_col > eq)
- # elif op == utils.FilterOperator.LESS_THAN.value:
- # where_clause_and.append(sqla_col < eq)
- # elif op == utils.FilterOperator.GREATER_THAN_OR_EQUALS.value:
- # where_clause_and.append(sqla_col >= eq)
- # elif op == utils.FilterOperator.LESS_THAN_OR_EQUALS.value:
- # where_clause_and.append(sqla_col <= eq)
- # elif op == utils.FilterOperator.LIKE.value:
- # where_clause_and.append(sqla_col.like(eq))
- # elif op == utils.FilterOperator.ILIKE.value:
- # where_clause_and.append(sqla_col.ilike(eq))
- # elif (
- # op == utils.FilterOperator.TEMPORAL_RANGE.value
- # and isinstance(eq, str)
- # and col_obj is not None
- # ):
- # _since, _until = get_since_until_from_time_range(
- # time_range=eq,
- # time_shift=time_shift,
- # extras=extras,
- # )
- # where_clause_and.append(
- # col_obj.get_time_filter(
- # start_dttm=_since,
- # end_dttm=_until,
- # label=sqla_col.key,
- # template_processor=template_processor,
- # )
- # )
- # else:
- # raise QueryObjectValidationError(
- # _("Invalid filter operation type: %(op)s", op=op)
- # )
- # where_clause_and += self.get_sqla_row_level_filters(template_processor)
- # if extras:
- # where = extras.get("where")
- # if where:
- # try:
- # where = template_processor.process_template(f"({where})")
- # except TemplateError as ex:
- # raise QueryObjectValidationError(
- # _(
- # "Error in jinja expression in WHERE clause: %(msg)s",
- # msg=ex.message,
- # )
- # ) from ex
- # where = _process_sql_expression(
- # expression=where,
- # database_id=self.database_id,
- # schema=self.schema,
- # )
- # where_clause_and += [self.text(where)]
- # having = extras.get("having")
- # if having:
- # try:
- # having = template_processor.process_template(f"({having})")
- # except TemplateError as ex:
- # raise QueryObjectValidationError(
- # _(
- # "Error in jinja expression in HAVING clause: %(msg)s",
- # msg=ex.message,
- # )
- # ) from ex
- # having = _process_sql_expression(
- # expression=having,
- # database_id=self.database_id,
- # schema=self.schema,
- # )
- # having_clause_and += [self.text(having)]
-
- # if apply_fetch_values_predicate and self.fetch_values_predicate:
- # qry = qry.where(
- # self.get_fetch_values_predicate(template_processor=template_processor)
- # )
- # if granularity:
- # qry = qry.where(and_(*(time_filters + where_clause_and)))
- # else:
- # qry = qry.where(and_(*where_clause_and))
- # qry = qry.having(and_(*having_clause_and))
-
- # self.make_orderby_compatible(select_exprs, orderby_exprs)
-
- # for col, (orig_col, ascending) in zip(orderby_exprs, orderby):
- # if not db_engine_spec.allows_alias_in_orderby and isinstance(col, Label):
- # # if engine does not allow using SELECT alias in ORDER BY
- # # revert to the underlying column
- # col = col.element
-
- # if (
- # db_engine_spec.allows_alias_in_select
- # and db_engine_spec.allows_hidden_cc_in_orderby
- # and col.name in [select_col.name for select_col in select_exprs]
- # ):
- # col = literal_column(col.name)
- # direction = asc if ascending else desc
- # qry = qry.order_by(direction(col))
-
- # if row_limit:
- # qry = qry.limit(row_limit)
- # if row_offset:
- # qry = qry.offset(row_offset)
-
- # if series_limit and groupby_series_columns:
- # if db_engine_spec.allows_joins and db_engine_spec.allows_subqueries:
- # # some sql dialects require for order by expressions
- # # to also be in the select clause -- others, e.g. vertica,
- # # require a unique inner alias
- # inner_main_metric_expr = self.make_sqla_column_compatible(
- # main_metric_expr, "mme_inner__"
- # )
- # inner_groupby_exprs = []
- # inner_select_exprs = []
- # for gby_name, gby_obj in groupby_series_columns.items():
- # label = get_column_name(gby_name)
- # inner = self.make_sqla_column_compatible(gby_obj, gby_name + "__")
- # inner_groupby_exprs.append(inner)
- # inner_select_exprs.append(inner)
-
- # inner_select_exprs += [inner_main_metric_expr]
- # subq = select(inner_select_exprs).select_from(tbl)
- # inner_time_filter = []
-
- # if dttm_col and not db_engine_spec.time_groupby_inline:
- # inner_time_filter = [
- # dttm_col.get_time_filter(
- # start_dttm=inner_from_dttm or from_dttm,
- # end_dttm=inner_to_dttm or to_dttm,
- # template_processor=template_processor,
- # )
- # ]
- # subq = subq.where(and_(*(where_clause_and + inner_time_filter)))
- # subq = subq.group_by(*inner_groupby_exprs)
-
- # ob = inner_main_metric_expr
- # if series_limit_metric:
- # ob = self._get_series_orderby(
- # series_limit_metric=series_limit_metric,
- # metrics_by_name=metrics_by_name,
- # columns_by_name=columns_by_name,
- # template_processor=template_processor,
- # )
- # direction = desc if order_desc else asc
- # subq = subq.order_by(direction(ob))
- # subq = subq.limit(series_limit)
-
- # on_clause = []
- # for gby_name, gby_obj in groupby_series_columns.items():
- # # in this case the column name, not the alias, needs to be
- # # conditionally mutated, as it refers to the column alias in
- # # the inner query
- # col_name = db_engine_spec.make_label_compatible(gby_name + "__")
- # on_clause.append(gby_obj == column(col_name))
-
- # tbl = tbl.join(subq.alias(), and_(*on_clause))
- # else:
- # if series_limit_metric:
- # orderby = [
- # (
- # self._get_series_orderby(
- # series_limit_metric=series_limit_metric,
- # metrics_by_name=metrics_by_name,
- # columns_by_name=columns_by_name,
- # template_processor=template_processor,
- # ),
- # not order_desc,
- # )
- # ]
-
- # # run prequery to get top groups
- # prequery_obj = {
- # "is_timeseries": False,
- # "row_limit": series_limit,
- # "metrics": metrics,
- # "granularity": granularity,
- # "groupby": groupby,
- # "from_dttm": inner_from_dttm or from_dttm,
- # "to_dttm": inner_to_dttm or to_dttm,
- # "filter": filter,
- # "orderby": orderby,
- # "extras": extras,
- # "columns": columns,
- # "order_desc": True,
- # }
-
- # result = self.query(prequery_obj)
- # prequeries.append(result.query)
- # dimensions = [
- # c
- # for c in result.df.columns
- # if c not in metrics and c in groupby_series_columns
- # ]
- # top_groups = self._get_top_groups(
- # result.df, dimensions, groupby_series_columns, columns_by_name
- # )
- # qry = qry.where(top_groups)
-
- # qry = qry.select_from(tbl)
-
- # if is_rowcount:
- # if not db_engine_spec.allows_subqueries:
- # raise QueryObjectValidationError(
- # _("Database does not support subqueries")
- # )
- # label = "rowcount"
- # col = self.make_sqla_column_compatible(literal_column("COUNT(*)"), label)
- # qry = select([col]).select_from(qry.alias("rowcount_qry"))
- # labels_expected = [label]
-
- # return SqlaQuery(
- # applied_template_filters=applied_template_filters,
- # cte=cte,
- # extra_cache_keys=extra_cache_keys,
- # labels_expected=labels_expected,
- # sqla_query=qry,
- # prequeries=prequeries,
- # )
-
def _get_series_orderby(
self,
series_limit_metric: Metric,