You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by yo...@apache.org on 2022/02/14 08:45:47 UTC
[superset] branch master updated: refactor: decouple pandas postprocessing operator (#18710)
This is an automated email from the ASF dual-hosted git repository.
yongjiezhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new 8d6aff3 refactor: decouple pandas postprocessing operator (#18710)
8d6aff3 is described below
commit 8d6aff3e5d9993d1d3d770d64eece44e3cf863c8
Author: Yongjie Zhao <yo...@gmail.com>
AuthorDate: Mon Feb 14 16:44:26 2022 +0800
refactor: decouple pandas postprocessing operator (#18710)
---
superset/utils/pandas_postprocessing.py | 1002 --------------------
superset/utils/pandas_postprocessing/__init__.py | 53 ++
superset/utils/pandas_postprocessing/aggregate.py | 46 +
superset/utils/pandas_postprocessing/boxplot.py | 125 +++
superset/utils/pandas_postprocessing/compare.py | 79 ++
.../utils/pandas_postprocessing/contribution.py | 75 ++
superset/utils/pandas_postprocessing/cum.py | 73 ++
superset/utils/pandas_postprocessing/diff.py | 51 +
superset/utils/pandas_postprocessing/geography.py | 114 +++
superset/utils/pandas_postprocessing/pivot.py | 125 +++
superset/utils/pandas_postprocessing/prophet.py | 157 +++
superset/utils/pandas_postprocessing/resample.py | 61 ++
superset/utils/pandas_postprocessing/rolling.py | 115 +++
superset/utils/pandas_postprocessing/select.py | 54 ++
superset/utils/pandas_postprocessing/sort.py | 35 +
superset/utils/pandas_postprocessing/utils.py | 201 ++++
16 files changed, 1364 insertions(+), 1002 deletions(-)
diff --git a/superset/utils/pandas_postprocessing.py b/superset/utils/pandas_postprocessing.py
deleted file mode 100644
index beef42a..0000000
--- a/superset/utils/pandas_postprocessing.py
+++ /dev/null
@@ -1,1002 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-# pylint: disable=too-many-lines
-import logging
-from decimal import Decimal
-from functools import partial
-from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
-
-import geohash as geohash_lib
-import numpy as np
-import pandas as pd
-from flask_babel import gettext as _
-from geopy.point import Point
-from pandas import DataFrame, NamedAgg, Series, Timestamp
-
-from superset.constants import NULL_STRING, PandasAxis, PandasPostprocessingCompare
-from superset.exceptions import QueryObjectValidationError
-from superset.utils.core import (
- DTTM_ALIAS,
- PostProcessingBoxplotWhiskerType,
- PostProcessingContributionOrientation,
- TIME_COMPARISION,
-)
-
-NUMPY_FUNCTIONS = {
- "average": np.average,
- "argmin": np.argmin,
- "argmax": np.argmax,
- "count": np.ma.count,
- "count_nonzero": np.count_nonzero,
- "cumsum": np.cumsum,
- "cumprod": np.cumprod,
- "max": np.max,
- "mean": np.mean,
- "median": np.median,
- "nansum": np.nansum,
- "nanmin": np.nanmin,
- "nanmax": np.nanmax,
- "nanmean": np.nanmean,
- "nanmedian": np.nanmedian,
- "nanpercentile": np.nanpercentile,
- "min": np.min,
- "percentile": np.percentile,
- "prod": np.prod,
- "product": np.product,
- "std": np.std,
- "sum": np.sum,
- "var": np.var,
-}
-
-DENYLIST_ROLLING_FUNCTIONS = (
- "count",
- "corr",
- "cov",
- "kurt",
- "max",
- "mean",
- "median",
- "min",
- "std",
- "skew",
- "sum",
- "var",
- "quantile",
-)
-
-ALLOWLIST_CUMULATIVE_FUNCTIONS = (
- "cummax",
- "cummin",
- "cumprod",
- "cumsum",
-)
-
-PROPHET_TIME_GRAIN_MAP = {
- "PT1S": "S",
- "PT1M": "min",
- "PT5M": "5min",
- "PT10M": "10min",
- "PT15M": "15min",
- "PT30M": "30min",
- "PT1H": "H",
- "P1D": "D",
- "P1W": "W",
- "P1M": "M",
- "P3M": "Q",
- "P1Y": "A",
- "1969-12-28T00:00:00Z/P1W": "W",
- "1969-12-29T00:00:00Z/P1W": "W",
- "P1W/1970-01-03T00:00:00Z": "W",
- "P1W/1970-01-04T00:00:00Z": "W",
-}
-
-
-def _flatten_column_after_pivot(
- column: Union[float, Timestamp, str, Tuple[str, ...]],
- aggregates: Dict[str, Dict[str, Any]],
-) -> str:
- """
- Function for flattening column names into a single string. This step is necessary
- to be able to properly serialize a DataFrame. If the column is a string, return
- element unchanged. For multi-element columns, join column elements with a comma,
- with the exception of pivots made with a single aggregate, in which case the
- aggregate column name is omitted.
-
- :param column: single element from `DataFrame.columns`
- :param aggregates: aggregates
- :return:
- """
- if not isinstance(column, tuple):
- column = (column,)
- if len(aggregates) == 1 and len(column) > 1:
- # drop aggregate for single aggregate pivots with multiple groupings
- # from column name (aggregates always come first in column name)
- column = column[1:]
- return ", ".join([str(col) for col in column])
-
-
-def validate_column_args(*argnames: str) -> Callable[..., Any]:
- def wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
- def wrapped(df: DataFrame, **options: Any) -> Any:
- if options.get("is_pivot_df"):
- # skip validation when pivot Dataframe
- return func(df, **options)
- columns = df.columns.tolist()
- for name in argnames:
- if name in options and not all(
- elem in columns for elem in options.get(name) or []
- ):
- raise QueryObjectValidationError(
- _("Referenced columns not available in DataFrame.")
- )
- return func(df, **options)
-
- return wrapped
-
- return wrapper
-
-
-def _get_aggregate_funcs(
- df: DataFrame, aggregates: Dict[str, Dict[str, Any]],
-) -> Dict[str, NamedAgg]:
- """
- Converts a set of aggregate config objects into functions that pandas can use as
- aggregators. Currently only numpy aggregators are supported.
-
- :param df: DataFrame on which to perform aggregate operation.
- :param aggregates: Mapping from column name to aggregate config.
- :return: Mapping from metric name to function that takes a single input argument.
- """
- agg_funcs: Dict[str, NamedAgg] = {}
- for name, agg_obj in aggregates.items():
- column = agg_obj.get("column", name)
- if column not in df:
- raise QueryObjectValidationError(
- _(
- "Column referenced by aggregate is undefined: %(column)s",
- column=column,
- )
- )
- if "operator" not in agg_obj:
- raise QueryObjectValidationError(
- _("Operator undefined for aggregator: %(name)s", name=name,)
- )
- operator = agg_obj["operator"]
- if callable(operator):
- aggfunc = operator
- else:
- func = NUMPY_FUNCTIONS.get(operator)
- if not func:
- raise QueryObjectValidationError(
- _("Invalid numpy function: %(operator)s", operator=operator,)
- )
- options = agg_obj.get("options", {})
- aggfunc = partial(func, **options)
- agg_funcs[name] = NamedAgg(column=column, aggfunc=aggfunc)
-
- return agg_funcs
-
-
-def _append_columns(
- base_df: DataFrame, append_df: DataFrame, columns: Dict[str, str]
-) -> DataFrame:
- """
- Function for adding columns from one DataFrame to another DataFrame. Calls the
- assign method, which overwrites the original column in `base_df` if the column
- already exists, and appends the column if the name is not defined.
-
- :param base_df: DataFrame which to use as the base
- :param append_df: DataFrame from which to select data.
- :param columns: columns on which to append, mapping source column to
- target column. For instance, `{'y': 'y'}` will replace the values in
- column `y` in `base_df` with the values in `y` in `append_df`,
- while `{'y': 'y2'}` will add a column `y2` to `base_df` based
- on values in column `y` in `append_df`, leaving the original column `y`
- in `base_df` unchanged.
- :return: new DataFrame with combined data from `base_df` and `append_df`
- """
- return base_df.assign(
- **{target: append_df[source] for source, target in columns.items()}
- )
-
-
-@validate_column_args("index", "columns")
-def pivot( # pylint: disable=too-many-arguments,too-many-locals
- df: DataFrame,
- index: List[str],
- aggregates: Dict[str, Dict[str, Any]],
- columns: Optional[List[str]] = None,
- metric_fill_value: Optional[Any] = None,
- column_fill_value: Optional[str] = NULL_STRING,
- drop_missing_columns: Optional[bool] = True,
- combine_value_with_metric: bool = False,
- marginal_distributions: Optional[bool] = None,
- marginal_distribution_name: Optional[str] = None,
- flatten_columns: bool = True,
- reset_index: bool = True,
-) -> DataFrame:
- """
- Perform a pivot operation on a DataFrame.
-
- :param df: Object on which pivot operation will be performed
- :param index: Columns to group by on the table index (=rows)
- :param columns: Columns to group by on the table columns
- :param metric_fill_value: Value to replace missing values with
- :param column_fill_value: Value to replace missing pivot columns with. By default
- replaces missing values with "<NULL>". Set to `None` to remove columns
- with missing values.
- :param drop_missing_columns: Do not include columns whose entries are all missing
- :param combine_value_with_metric: Display metrics side by side within each column,
- as opposed to each column being displayed side by side for each metric.
- :param aggregates: A mapping from aggregate column name to the the aggregate
- config.
- :param marginal_distributions: Add totals for row/column. Default to False
- :param marginal_distribution_name: Name of row/column with marginal distribution.
- Default to 'All'.
- :param flatten_columns: Convert column names to strings
- :param reset_index: Convert index to column
- :return: A pivot table
- :raises QueryObjectValidationError: If the request in incorrect
- """
- if not index:
- raise QueryObjectValidationError(
- _("Pivot operation requires at least one index")
- )
- if not aggregates:
- raise QueryObjectValidationError(
- _("Pivot operation must include at least one aggregate")
- )
-
- if columns and column_fill_value:
- df[columns] = df[columns].fillna(value=column_fill_value)
-
- aggregate_funcs = _get_aggregate_funcs(df, aggregates)
-
- # TODO (villebro): Pandas 1.0.3 doesn't yet support NamedAgg in pivot_table.
- # Remove once/if support is added.
- aggfunc = {na.column: na.aggfunc for na in aggregate_funcs.values()}
-
- # When dropna = False, the pivot_table function will calculate cartesian-product
- # for MultiIndex.
- # https://github.com/apache/superset/issues/15956
- # https://github.com/pandas-dev/pandas/issues/18030
- series_set = set()
- if not drop_missing_columns and columns:
- for row in df[columns].itertuples():
- for metric in aggfunc.keys():
- series_set.add(str(tuple([metric]) + tuple(row[1:])))
-
- df = df.pivot_table(
- values=aggfunc.keys(),
- index=index,
- columns=columns,
- aggfunc=aggfunc,
- fill_value=metric_fill_value,
- dropna=drop_missing_columns,
- margins=marginal_distributions,
- margins_name=marginal_distribution_name,
- )
-
- if not drop_missing_columns and len(series_set) > 0 and not df.empty:
- for col in df.columns:
- series = str(col)
- if series not in series_set:
- df = df.drop(col, axis=PandasAxis.COLUMN)
-
- if combine_value_with_metric:
- df = df.stack(0).unstack()
-
- # Make index regular column
- if flatten_columns:
- df.columns = [
- _flatten_column_after_pivot(col, aggregates) for col in df.columns
- ]
- # return index as regular column
- if reset_index:
- df.reset_index(level=0, inplace=True)
- return df
-
-
-@validate_column_args("groupby")
-def aggregate(
- df: DataFrame, groupby: List[str], aggregates: Dict[str, Dict[str, Any]]
-) -> DataFrame:
- """
- Apply aggregations to a DataFrame.
-
- :param df: Object to aggregate.
- :param groupby: columns to aggregate
- :param aggregates: A mapping from metric column to the function used to
- aggregate values.
- :raises QueryObjectValidationError: If the request in incorrect
- """
- aggregates = aggregates or {}
- aggregate_funcs = _get_aggregate_funcs(df, aggregates)
- if groupby:
- df_groupby = df.groupby(by=groupby)
- else:
- df_groupby = df.groupby(lambda _: True)
- return df_groupby.agg(**aggregate_funcs).reset_index(drop=not groupby)
-
-
-@validate_column_args("columns")
-def sort(df: DataFrame, columns: Dict[str, bool]) -> DataFrame:
- """
- Sort a DataFrame.
-
- :param df: DataFrame to sort.
- :param columns: columns by by which to sort. The key specifies the column name,
- value specifies if sorting in ascending order.
- :return: Sorted DataFrame
- :raises QueryObjectValidationError: If the request in incorrect
- """
- return df.sort_values(by=list(columns.keys()), ascending=list(columns.values()))
-
-
-@validate_column_args("columns")
-def rolling( # pylint: disable=too-many-arguments
- df: DataFrame,
- rolling_type: str,
- columns: Optional[Dict[str, str]] = None,
- window: Optional[int] = None,
- rolling_type_options: Optional[Dict[str, Any]] = None,
- center: bool = False,
- win_type: Optional[str] = None,
- min_periods: Optional[int] = None,
- is_pivot_df: bool = False,
-) -> DataFrame:
- """
- Apply a rolling window on the dataset. See the Pandas docs for further details:
- https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.rolling.html
-
- :param df: DataFrame on which the rolling period will be based.
- :param columns: columns on which to perform rolling, mapping source column to
- target column. For instance, `{'y': 'y'}` will replace the column `y` with
- the rolling value in `y`, while `{'y': 'y2'}` will add a column `y2` based
- on rolling values calculated from `y`, leaving the original column `y`
- unchanged.
- :param rolling_type: Type of rolling window. Any numpy function will work.
- :param window: Size of the window.
- :param rolling_type_options: Optional options to pass to rolling method. Needed
- for e.g. quantile operation.
- :param center: Should the label be at the center of the window.
- :param win_type: Type of window function.
- :param min_periods: The minimum amount of periods required for a row to be included
- in the result set.
- :param is_pivot_df: Dataframe is pivoted or not
- :return: DataFrame with the rolling columns
- :raises QueryObjectValidationError: If the request in incorrect
- """
- rolling_type_options = rolling_type_options or {}
- columns = columns or {}
- if is_pivot_df:
- df_rolling = df
- else:
- df_rolling = df[columns.keys()]
- kwargs: Dict[str, Union[str, int]] = {}
- if window is None:
- raise QueryObjectValidationError(_("Undefined window for rolling operation"))
- if window == 0:
- raise QueryObjectValidationError(_("Window must be > 0"))
-
- kwargs["window"] = window
- if min_periods is not None:
- kwargs["min_periods"] = min_periods
- if center is not None:
- kwargs["center"] = center
- if win_type is not None:
- kwargs["win_type"] = win_type
-
- df_rolling = df_rolling.rolling(**kwargs)
- if rolling_type not in DENYLIST_ROLLING_FUNCTIONS or not hasattr(
- df_rolling, rolling_type
- ):
- raise QueryObjectValidationError(
- _("Invalid rolling_type: %(type)s", type=rolling_type)
- )
- try:
- df_rolling = getattr(df_rolling, rolling_type)(**rolling_type_options)
- except TypeError as ex:
- raise QueryObjectValidationError(
- _(
- "Invalid options for %(rolling_type)s: %(options)s",
- rolling_type=rolling_type,
- options=rolling_type_options,
- )
- ) from ex
-
- if is_pivot_df:
- agg_in_pivot_df = df.columns.get_level_values(0).drop_duplicates().to_list()
- agg: Dict[str, Dict[str, Any]] = {col: {} for col in agg_in_pivot_df}
- df_rolling.columns = [
- _flatten_column_after_pivot(col, agg) for col in df_rolling.columns
- ]
- df_rolling.reset_index(level=0, inplace=True)
- else:
- df_rolling = _append_columns(df, df_rolling, columns)
-
- if min_periods:
- df_rolling = df_rolling[min_periods:]
- return df_rolling
-
-
-@validate_column_args("columns", "drop", "rename")
-def select(
- df: DataFrame,
- columns: Optional[List[str]] = None,
- exclude: Optional[List[str]] = None,
- rename: Optional[Dict[str, str]] = None,
-) -> DataFrame:
- """
- Only select a subset of columns in the original dataset. Can be useful for
- removing unnecessary intermediate results, renaming and reordering columns.
-
- :param df: DataFrame on which the rolling period will be based.
- :param columns: Columns which to select from the DataFrame, in the desired order.
- If left undefined, all columns will be selected. If columns are
- renamed, the original column name should be referenced here.
- :param exclude: columns to exclude from selection. If columns are renamed, the new
- column name should be referenced here.
- :param rename: columns which to rename, mapping source column to target column.
- For instance, `{'y': 'y2'}` will rename the column `y` to
- `y2`.
- :return: Subset of columns in original DataFrame
- :raises QueryObjectValidationError: If the request in incorrect
- """
- df_select = df.copy(deep=False)
- if columns:
- df_select = df_select[columns]
- if exclude:
- df_select = df_select.drop(exclude, axis=1)
- if rename is not None:
- df_select = df_select.rename(columns=rename)
- return df_select
-
-
-@validate_column_args("columns")
-def diff(
- df: DataFrame,
- columns: Dict[str, str],
- periods: int = 1,
- axis: PandasAxis = PandasAxis.ROW,
-) -> DataFrame:
- """
- Calculate row-by-row or column-by-column difference for select columns.
-
- :param df: DataFrame on which the diff will be based.
- :param columns: columns on which to perform diff, mapping source column to
- target column. For instance, `{'y': 'y'}` will replace the column `y` with
- the diff value in `y`, while `{'y': 'y2'}` will add a column `y2` based
- on diff values calculated from `y`, leaving the original column `y`
- unchanged.
- :param periods: periods to shift for calculating difference.
- :param axis: 0 for row, 1 for column. default 0.
- :return: DataFrame with diffed columns
- :raises QueryObjectValidationError: If the request in incorrect
- """
- df_diff = df[columns.keys()]
- df_diff = df_diff.diff(periods=periods, axis=axis)
- return _append_columns(df, df_diff, columns)
-
-
-@validate_column_args("source_columns", "compare_columns")
-def compare( # pylint: disable=too-many-arguments
- df: DataFrame,
- source_columns: List[str],
- compare_columns: List[str],
- compare_type: Optional[PandasPostprocessingCompare],
- drop_original_columns: Optional[bool] = False,
- precision: Optional[int] = 4,
-) -> DataFrame:
- """
- Calculate column-by-column changing for select columns.
-
- :param df: DataFrame on which the compare will be based.
- :param source_columns: Main query columns
- :param compare_columns: Columns being compared
- :param compare_type: Type of compare. Choice of `absolute`, `percentage` or `ratio`
- :param drop_original_columns: Whether to remove the source columns and
- compare columns.
- :param precision: Round a change rate to a variable number of decimal places.
- :return: DataFrame with compared columns.
- :raises QueryObjectValidationError: If the request in incorrect.
- """
- if len(source_columns) != len(compare_columns):
- raise QueryObjectValidationError(
- _("`compare_columns` must have the same length as `source_columns`.")
- )
- if compare_type not in tuple(PandasPostprocessingCompare):
- raise QueryObjectValidationError(
- _("`compare_type` must be `difference`, `percentage` or `ratio`")
- )
- if len(source_columns) == 0:
- return df
-
- for s_col, c_col in zip(source_columns, compare_columns):
- if compare_type == PandasPostprocessingCompare.DIFF:
- diff_series = df[s_col] - df[c_col]
- elif compare_type == PandasPostprocessingCompare.PCT:
- diff_series = (
- ((df[s_col] - df[c_col]) / df[c_col]).astype(float).round(precision)
- )
- else:
- # compare_type == "ratio"
- diff_series = (df[s_col] / df[c_col]).astype(float).round(precision)
- diff_df = diff_series.to_frame(
- name=TIME_COMPARISION.join([compare_type, s_col, c_col])
- )
- df = pd.concat([df, diff_df], axis=1)
-
- if drop_original_columns:
- df = df.drop(source_columns + compare_columns, axis=1)
- return df
-
-
-@validate_column_args("columns")
-def cum(
- df: DataFrame,
- operator: str,
- columns: Optional[Dict[str, str]] = None,
- is_pivot_df: bool = False,
-) -> DataFrame:
- """
- Calculate cumulative sum/product/min/max for select columns.
-
- :param df: DataFrame on which the cumulative operation will be based.
- :param columns: columns on which to perform a cumulative operation, mapping source
- column to target column. For instance, `{'y': 'y'}` will replace the column
- `y` with the cumulative value in `y`, while `{'y': 'y2'}` will add a column
- `y2` based on cumulative values calculated from `y`, leaving the original
- column `y` unchanged.
- :param operator: cumulative operator, e.g. `sum`, `prod`, `min`, `max`
- :param is_pivot_df: Dataframe is pivoted or not
- :return: DataFrame with cumulated columns
- """
- columns = columns or {}
- if is_pivot_df:
- df_cum = df
- else:
- df_cum = df[columns.keys()]
- operation = "cum" + operator
- if operation not in ALLOWLIST_CUMULATIVE_FUNCTIONS or not hasattr(
- df_cum, operation
- ):
- raise QueryObjectValidationError(
- _("Invalid cumulative operator: %(operator)s", operator=operator)
- )
- if is_pivot_df:
- df_cum = getattr(df_cum, operation)()
- agg_in_pivot_df = df.columns.get_level_values(0).drop_duplicates().to_list()
- agg: Dict[str, Dict[str, Any]] = {col: {} for col in agg_in_pivot_df}
- df_cum.columns = [
- _flatten_column_after_pivot(col, agg) for col in df_cum.columns
- ]
- df_cum.reset_index(level=0, inplace=True)
- else:
- df_cum = _append_columns(df, getattr(df_cum, operation)(), columns)
- return df_cum
-
-
-def geohash_decode(
- df: DataFrame, geohash: str, longitude: str, latitude: str
-) -> DataFrame:
- """
- Decode a geohash column into longitude and latitude
-
- :param df: DataFrame containing geohash data
- :param geohash: Name of source column containing geohash location.
- :param longitude: Name of new column to be created containing longitude.
- :param latitude: Name of new column to be created containing latitude.
- :return: DataFrame with decoded longitudes and latitudes
- """
- try:
- lonlat_df = DataFrame()
- lonlat_df["latitude"], lonlat_df["longitude"] = zip(
- *df[geohash].apply(geohash_lib.decode)
- )
- return _append_columns(
- df, lonlat_df, {"latitude": latitude, "longitude": longitude}
- )
- except ValueError as ex:
- raise QueryObjectValidationError(_("Invalid geohash string")) from ex
-
-
-def geohash_encode(
- df: DataFrame, geohash: str, longitude: str, latitude: str,
-) -> DataFrame:
- """
- Encode longitude and latitude into geohash
-
- :param df: DataFrame containing longitude and latitude data
- :param geohash: Name of new column to be created containing geohash location.
- :param longitude: Name of source column containing longitude.
- :param latitude: Name of source column containing latitude.
- :return: DataFrame with decoded longitudes and latitudes
- """
- try:
- encode_df = df[[latitude, longitude]]
- encode_df.columns = ["latitude", "longitude"]
- encode_df["geohash"] = encode_df.apply(
- lambda row: geohash_lib.encode(row["latitude"], row["longitude"]), axis=1,
- )
- return _append_columns(df, encode_df, {"geohash": geohash})
- except ValueError as ex:
- raise QueryObjectValidationError(_("Invalid longitude/latitude")) from ex
-
-
-def geodetic_parse(
- df: DataFrame,
- geodetic: str,
- longitude: str,
- latitude: str,
- altitude: Optional[str] = None,
-) -> DataFrame:
- """
- Parse a column containing a geodetic point string
- [Geopy](https://geopy.readthedocs.io/en/stable/#geopy.point.Point).
-
- :param df: DataFrame containing geodetic point data
- :param geodetic: Name of source column containing geodetic point string.
- :param longitude: Name of new column to be created containing longitude.
- :param latitude: Name of new column to be created containing latitude.
- :param altitude: Name of new column to be created containing altitude.
- :return: DataFrame with decoded longitudes and latitudes
- """
-
- def _parse_location(location: str) -> Tuple[float, float, float]:
- """
- Parse a string containing a geodetic point and return latitude, longitude
- and altitude
- """
- point = Point(location)
- return point[0], point[1], point[2]
-
- try:
- geodetic_df = DataFrame()
- (
- geodetic_df["latitude"],
- geodetic_df["longitude"],
- geodetic_df["altitude"],
- ) = zip(*df[geodetic].apply(_parse_location))
- columns = {"latitude": latitude, "longitude": longitude}
- if altitude:
- columns["altitude"] = altitude
- return _append_columns(df, geodetic_df, columns)
- except ValueError as ex:
- raise QueryObjectValidationError(_("Invalid geodetic string")) from ex
-
-
-@validate_column_args("columns")
-def contribution(
- df: DataFrame,
- orientation: Optional[
- PostProcessingContributionOrientation
- ] = PostProcessingContributionOrientation.COLUMN,
- columns: Optional[List[str]] = None,
- rename_columns: Optional[List[str]] = None,
-) -> DataFrame:
- """
- Calculate cell contibution to row/column total for numeric columns.
- Non-numeric columns will be kept untouched.
-
- If `columns` are specified, only calculate contributions on selected columns.
-
- :param df: DataFrame containing all-numeric data (temporal column ignored)
- :param columns: Columns to calculate values from.
- :param rename_columns: The new labels for the calculated contribution columns.
- The original columns will not be removed.
- :param orientation: calculate by dividing cell with row/column total
- :return: DataFrame with contributions.
- """
- contribution_df = df.copy()
- numeric_df = contribution_df.select_dtypes(include=["number", Decimal])
- # verify column selections
- if columns:
- numeric_columns = numeric_df.columns.tolist()
- for col in columns:
- if col not in numeric_columns:
- raise QueryObjectValidationError(
- _(
- 'Column "%(column)s" is not numeric or does not '
- "exists in the query results.",
- column=col,
- )
- )
- columns = columns or numeric_df.columns
- rename_columns = rename_columns or columns
- if len(rename_columns) != len(columns):
- raise QueryObjectValidationError(
- _("`rename_columns` must have the same length as `columns`.")
- )
- # limit to selected columns
- numeric_df = numeric_df[columns]
- axis = 0 if orientation == PostProcessingContributionOrientation.COLUMN else 1
- numeric_df = numeric_df / numeric_df.values.sum(axis=axis, keepdims=True)
- contribution_df[rename_columns] = numeric_df
- return contribution_df
-
-
-def _prophet_parse_seasonality(
- input_value: Optional[Union[bool, int]]
-) -> Union[bool, str, int]:
- if input_value is None:
- return "auto"
- if isinstance(input_value, bool):
- return input_value
- try:
- return int(input_value)
- except ValueError:
- return input_value
-
-
-def _prophet_fit_and_predict( # pylint: disable=too-many-arguments
- df: DataFrame,
- confidence_interval: float,
- yearly_seasonality: Union[bool, str, int],
- weekly_seasonality: Union[bool, str, int],
- daily_seasonality: Union[bool, str, int],
- periods: int,
- freq: str,
-) -> DataFrame:
- """
- Fit a prophet model and return a DataFrame with predicted results.
- """
- try:
- # pylint: disable=import-error,import-outside-toplevel
- from prophet import Prophet
-
- prophet_logger = logging.getLogger("prophet.plot")
- prophet_logger.setLevel(logging.CRITICAL)
- prophet_logger.setLevel(logging.NOTSET)
- except ModuleNotFoundError as ex:
- raise QueryObjectValidationError(_("`prophet` package not installed")) from ex
- model = Prophet(
- interval_width=confidence_interval,
- yearly_seasonality=yearly_seasonality,
- weekly_seasonality=weekly_seasonality,
- daily_seasonality=daily_seasonality,
- )
- if df["ds"].dt.tz:
- df["ds"] = df["ds"].dt.tz_convert(None)
- model.fit(df)
- future = model.make_future_dataframe(periods=periods, freq=freq)
- forecast = model.predict(future)[["ds", "yhat", "yhat_lower", "yhat_upper"]]
- return forecast.join(df.set_index("ds"), on="ds").set_index(["ds"])
-
-
-def prophet( # pylint: disable=too-many-arguments
- df: DataFrame,
- time_grain: str,
- periods: int,
- confidence_interval: float,
- yearly_seasonality: Optional[Union[bool, int]] = None,
- weekly_seasonality: Optional[Union[bool, int]] = None,
- daily_seasonality: Optional[Union[bool, int]] = None,
- index: Optional[str] = None,
-) -> DataFrame:
- """
- Add forecasts to each series in a timeseries dataframe, along with confidence
- intervals for the prediction. For each series, the operation creates three
- new columns with the column name suffixed with the following values:
-
- - `__yhat`: the forecast for the given date
- - `__yhat_lower`: the lower bound of the forecast for the given date
- - `__yhat_upper`: the upper bound of the forecast for the given date
-
-
- :param df: DataFrame containing all-numeric data (temporal column ignored)
- :param time_grain: Time grain used to specify time period increments in prediction
- :param periods: Time periods (in units of `time_grain`) to predict into the future
- :param confidence_interval: Width of predicted confidence interval
- :param yearly_seasonality: Should yearly seasonality be applied.
- An integer value will specify Fourier order of seasonality.
- :param weekly_seasonality: Should weekly seasonality be applied.
- An integer value will specify Fourier order of seasonality, `None` will
- automatically detect seasonality.
- :param daily_seasonality: Should daily seasonality be applied.
- An integer value will specify Fourier order of seasonality, `None` will
- automatically detect seasonality.
- :param index: the name of the column containing the x-axis data
- :return: DataFrame with contributions, with temporal column at beginning if present
- """
- index = index or DTTM_ALIAS
- # validate inputs
- if not time_grain:
- raise QueryObjectValidationError(_("Time grain missing"))
- if time_grain not in PROPHET_TIME_GRAIN_MAP:
- raise QueryObjectValidationError(
- _("Unsupported time grain: %(time_grain)s", time_grain=time_grain,)
- )
- freq = PROPHET_TIME_GRAIN_MAP[time_grain]
- # check type at runtime due to marhsmallow schema not being able to handle
- # union types
- if not isinstance(periods, int) or periods < 0:
- raise QueryObjectValidationError(_("Periods must be a whole number"))
- if not confidence_interval or confidence_interval <= 0 or confidence_interval >= 1:
- raise QueryObjectValidationError(
- _("Confidence interval must be between 0 and 1 (exclusive)")
- )
- if index not in df.columns:
- raise QueryObjectValidationError(_("DataFrame must include temporal column"))
- if len(df.columns) < 2:
- raise QueryObjectValidationError(_("DataFrame include at least one series"))
-
- target_df = DataFrame()
- for column in [column for column in df.columns if column != index]:
- fit_df = _prophet_fit_and_predict(
- df=df[[index, column]].rename(columns={index: "ds", column: "y"}),
- confidence_interval=confidence_interval,
- yearly_seasonality=_prophet_parse_seasonality(yearly_seasonality),
- weekly_seasonality=_prophet_parse_seasonality(weekly_seasonality),
- daily_seasonality=_prophet_parse_seasonality(daily_seasonality),
- periods=periods,
- freq=freq,
- )
- new_columns = [
- f"{column}__yhat",
- f"{column}__yhat_lower",
- f"{column}__yhat_upper",
- f"{column}",
- ]
- fit_df.columns = new_columns
- if target_df.empty:
- target_df = fit_df
- else:
- for new_column in new_columns:
- target_df = target_df.assign(**{new_column: fit_df[new_column]})
- target_df.reset_index(level=0, inplace=True)
- return target_df.rename(columns={"ds": index})
-
-
-def boxplot(
- df: DataFrame,
- groupby: List[str],
- metrics: List[str],
- whisker_type: PostProcessingBoxplotWhiskerType,
- percentiles: Optional[
- Union[List[Union[int, float]], Tuple[Union[int, float], Union[int, float]]]
- ] = None,
-) -> DataFrame:
- """
- Calculate boxplot statistics. For each metric, the operation creates eight
- new columns with the column name suffixed with the following values:
-
- - `__mean`: the mean
- - `__median`: the median
- - `__max`: the maximum value excluding outliers (see whisker type)
- - `__min`: the minimum value excluding outliers (see whisker type)
- - `__q1`: the median
- - `__q1`: the first quartile (25th percentile)
- - `__q3`: the third quartile (75th percentile)
- - `__count`: count of observations
- - `__outliers`: the values that fall outside the minimum/maximum value
- (see whisker type)
-
- :param df: DataFrame containing all-numeric data (temporal column ignored)
- :param groupby: The categories to group by (x-axis)
- :param metrics: The metrics for which to calculate the distribution
- :param whisker_type: The confidence level type
- :return: DataFrame with boxplot statistics per groupby
- """
-
- def quartile1(series: Series) -> float:
- return np.nanpercentile(series, 25, interpolation="midpoint")
-
- def quartile3(series: Series) -> float:
- return np.nanpercentile(series, 75, interpolation="midpoint")
-
- if whisker_type == PostProcessingBoxplotWhiskerType.TUKEY:
-
- def whisker_high(series: Series) -> float:
- upper_outer_lim = quartile3(series) + 1.5 * (
- quartile3(series) - quartile1(series)
- )
- return series[series <= upper_outer_lim].max()
-
- def whisker_low(series: Series) -> float:
- lower_outer_lim = quartile1(series) - 1.5 * (
- quartile3(series) - quartile1(series)
- )
- return series[series >= lower_outer_lim].min()
-
- elif whisker_type == PostProcessingBoxplotWhiskerType.PERCENTILE:
- if (
- not isinstance(percentiles, (list, tuple))
- or len(percentiles) != 2
- or not isinstance(percentiles[0], (int, float))
- or not isinstance(percentiles[1], (int, float))
- or percentiles[0] >= percentiles[1]
- ):
- raise QueryObjectValidationError(
- _(
- "percentiles must be a list or tuple with two numeric values, "
- "of which the first is lower than the second value"
- )
- )
- low, high = percentiles[0], percentiles[1]
-
- def whisker_high(series: Series) -> float:
- return np.nanpercentile(series, high)
-
- def whisker_low(series: Series) -> float:
- return np.nanpercentile(series, low)
-
- else:
- whisker_high = np.max
- whisker_low = np.min
-
- def outliers(series: Series) -> Set[float]:
- above = series[series > whisker_high(series)]
- below = series[series < whisker_low(series)]
- return above.tolist() + below.tolist()
-
- operators: Dict[str, Callable[[Any], Any]] = {
- "mean": np.mean,
- "median": np.median,
- "max": whisker_high,
- "min": whisker_low,
- "q1": quartile1,
- "q3": quartile3,
- "count": np.ma.count,
- "outliers": outliers,
- }
- aggregates: Dict[str, Dict[str, Union[str, Callable[..., Any]]]] = {
- f"{metric}__{operator_name}": {"column": metric, "operator": operator}
- for operator_name, operator in operators.items()
- for metric in metrics
- }
- return aggregate(df, groupby=groupby, aggregates=aggregates)
-
-
-@validate_column_args("groupby_columns")
-def resample( # pylint: disable=too-many-arguments
- df: DataFrame,
- rule: str,
- method: str,
- time_column: str,
- groupby_columns: Optional[Tuple[Optional[str], ...]] = None,
- fill_value: Optional[Union[float, int]] = None,
-) -> DataFrame:
- """
- support upsampling in resample
-
- :param df: DataFrame to resample.
- :param rule: The offset string representing target conversion.
- :param method: How to fill the NaN value after resample.
- :param time_column: existing columns in DataFrame.
- :param groupby_columns: columns except time_column in dataframe
- :param fill_value: What values do fill missing.
- :return: DataFrame after resample
- :raises QueryObjectValidationError: If the request in incorrect
- """
-
- def _upsampling(_df: DataFrame) -> DataFrame:
- _df = _df.set_index(time_column)
- if method == "asfreq" and fill_value is not None:
- return _df.resample(rule).asfreq(fill_value=fill_value)
- return getattr(_df.resample(rule), method)()
-
- if groupby_columns:
- df = (
- df.set_index(keys=list(groupby_columns))
- .groupby(by=list(groupby_columns))
- .apply(_upsampling)
- )
- df = df.reset_index().set_index(time_column).sort_index()
- else:
- df = _upsampling(df)
- return df.reset_index()
diff --git a/superset/utils/pandas_postprocessing/__init__.py b/superset/utils/pandas_postprocessing/__init__.py
new file mode 100644
index 0000000..976e629
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/__init__.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from superset.utils.pandas_postprocessing.aggregate import aggregate
+from superset.utils.pandas_postprocessing.boxplot import boxplot
+from superset.utils.pandas_postprocessing.compare import compare
+from superset.utils.pandas_postprocessing.contribution import contribution
+from superset.utils.pandas_postprocessing.cum import cum
+from superset.utils.pandas_postprocessing.diff import diff
+from superset.utils.pandas_postprocessing.geography import (
+ geodetic_parse,
+ geohash_decode,
+ geohash_encode,
+)
+from superset.utils.pandas_postprocessing.pivot import pivot
+from superset.utils.pandas_postprocessing.prophet import prophet
+from superset.utils.pandas_postprocessing.resample import resample
+from superset.utils.pandas_postprocessing.rolling import rolling
+from superset.utils.pandas_postprocessing.select import select
+from superset.utils.pandas_postprocessing.sort import sort
+from superset.utils.pandas_postprocessing.utils import _flatten_column_after_pivot
+
+__all__ = [
+ "aggregate",
+ "boxplot",
+ "compare",
+ "contribution",
+ "cum",
+ "diff",
+ "geohash_encode",
+ "geohash_decode",
+ "geodetic_parse",
+ "pivot",
+ "prophet",
+ "resample",
+ "rolling",
+ "select",
+ "sort",
+ "_flatten_column_after_pivot",
+]
diff --git a/superset/utils/pandas_postprocessing/aggregate.py b/superset/utils/pandas_postprocessing/aggregate.py
new file mode 100644
index 0000000..2d6d396
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/aggregate.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, List
+
+from pandas import DataFrame
+
+from superset.utils.pandas_postprocessing.utils import (
+ _get_aggregate_funcs,
+ validate_column_args,
+)
+
+
+@validate_column_args("groupby")
+def aggregate(
+ df: DataFrame, groupby: List[str], aggregates: Dict[str, Dict[str, Any]]
+) -> DataFrame:
+ """
+ Apply aggregations to a DataFrame.
+
+ :param df: Object to aggregate.
+ :param groupby: columns to aggregate
+ :param aggregates: A mapping from metric column to the function used to
+ aggregate values.
+ :raises QueryObjectValidationError: If the request in incorrect
+ """
+ aggregates = aggregates or {}
+ aggregate_funcs = _get_aggregate_funcs(df, aggregates)
+ if groupby:
+ df_groupby = df.groupby(by=groupby)
+ else:
+ df_groupby = df.groupby(lambda _: True)
+ return df_groupby.agg(**aggregate_funcs).reset_index(drop=not groupby)
diff --git a/superset/utils/pandas_postprocessing/boxplot.py b/superset/utils/pandas_postprocessing/boxplot.py
new file mode 100644
index 0000000..9887507
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/boxplot.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
+
+import numpy as np
+from flask_babel import gettext as _
+from pandas import DataFrame, Series
+
+from superset.exceptions import QueryObjectValidationError
+from superset.utils.core import PostProcessingBoxplotWhiskerType
+from superset.utils.pandas_postprocessing.aggregate import aggregate
+
+
+def boxplot(
+ df: DataFrame,
+ groupby: List[str],
+ metrics: List[str],
+ whisker_type: PostProcessingBoxplotWhiskerType,
+ percentiles: Optional[
+ Union[List[Union[int, float]], Tuple[Union[int, float], Union[int, float]]]
+ ] = None,
+) -> DataFrame:
+ """
+ Calculate boxplot statistics. For each metric, the operation creates eight
+ new columns with the column name suffixed with the following values:
+
+ - `__mean`: the mean
+ - `__median`: the median
+ - `__max`: the maximum value excluding outliers (see whisker type)
+ - `__min`: the minimum value excluding outliers (see whisker type)
+ - `__q1`: the median
+ - `__q1`: the first quartile (25th percentile)
+ - `__q3`: the third quartile (75th percentile)
+ - `__count`: count of observations
+ - `__outliers`: the values that fall outside the minimum/maximum value
+ (see whisker type)
+
+ :param df: DataFrame containing all-numeric data (temporal column ignored)
+ :param groupby: The categories to group by (x-axis)
+ :param metrics: The metrics for which to calculate the distribution
+ :param whisker_type: The confidence level type
+ :return: DataFrame with boxplot statistics per groupby
+ """
+
+ def quartile1(series: Series) -> float:
+ return np.nanpercentile(series, 25, interpolation="midpoint")
+
+ def quartile3(series: Series) -> float:
+ return np.nanpercentile(series, 75, interpolation="midpoint")
+
+ if whisker_type == PostProcessingBoxplotWhiskerType.TUKEY:
+
+ def whisker_high(series: Series) -> float:
+ upper_outer_lim = quartile3(series) + 1.5 * (
+ quartile3(series) - quartile1(series)
+ )
+ return series[series <= upper_outer_lim].max()
+
+ def whisker_low(series: Series) -> float:
+ lower_outer_lim = quartile1(series) - 1.5 * (
+ quartile3(series) - quartile1(series)
+ )
+ return series[series >= lower_outer_lim].min()
+
+ elif whisker_type == PostProcessingBoxplotWhiskerType.PERCENTILE:
+ if (
+ not isinstance(percentiles, (list, tuple))
+ or len(percentiles) != 2
+ or not isinstance(percentiles[0], (int, float))
+ or not isinstance(percentiles[1], (int, float))
+ or percentiles[0] >= percentiles[1]
+ ):
+ raise QueryObjectValidationError(
+ _(
+ "percentiles must be a list or tuple with two numeric values, "
+ "of which the first is lower than the second value"
+ )
+ )
+ low, high = percentiles[0], percentiles[1]
+
+ def whisker_high(series: Series) -> float:
+ return np.nanpercentile(series, high)
+
+ def whisker_low(series: Series) -> float:
+ return np.nanpercentile(series, low)
+
+ else:
+ whisker_high = np.max
+ whisker_low = np.min
+
+ def outliers(series: Series) -> Set[float]:
+ above = series[series > whisker_high(series)]
+ below = series[series < whisker_low(series)]
+ return above.tolist() + below.tolist()
+
+ operators: Dict[str, Callable[[Any], Any]] = {
+ "mean": np.mean,
+ "median": np.median,
+ "max": whisker_high,
+ "min": whisker_low,
+ "q1": quartile1,
+ "q3": quartile3,
+ "count": np.ma.count,
+ "outliers": outliers,
+ }
+ aggregates: Dict[str, Dict[str, Union[str, Callable[..., Any]]]] = {
+ f"{metric}__{operator_name}": {"column": metric, "operator": operator}
+ for operator_name, operator in operators.items()
+ for metric in metrics
+ }
+ return aggregate(df, groupby=groupby, aggregates=aggregates)
diff --git a/superset/utils/pandas_postprocessing/compare.py b/superset/utils/pandas_postprocessing/compare.py
new file mode 100644
index 0000000..67f275e
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/compare.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import List, Optional
+
+import pandas as pd
+from flask_babel import gettext as _
+from pandas import DataFrame
+
+from superset.constants import PandasPostprocessingCompare
+from superset.exceptions import QueryObjectValidationError
+from superset.utils.core import TIME_COMPARISION
+from superset.utils.pandas_postprocessing.utils import validate_column_args
+
+
+@validate_column_args("source_columns", "compare_columns")
+def compare( # pylint: disable=too-many-arguments
+ df: DataFrame,
+ source_columns: List[str],
+ compare_columns: List[str],
+ compare_type: Optional[PandasPostprocessingCompare],
+ drop_original_columns: Optional[bool] = False,
+ precision: Optional[int] = 4,
+) -> DataFrame:
+ """
+ Calculate column-by-column changing for select columns.
+
+ :param df: DataFrame on which the compare will be based.
+ :param source_columns: Main query columns
+ :param compare_columns: Columns being compared
+ :param compare_type: Type of compare. Choice of `absolute`, `percentage` or `ratio`
+ :param drop_original_columns: Whether to remove the source columns and
+ compare columns.
+ :param precision: Round a change rate to a variable number of decimal places.
+ :return: DataFrame with compared columns.
+ :raises QueryObjectValidationError: If the request in incorrect.
+ """
+ if len(source_columns) != len(compare_columns):
+ raise QueryObjectValidationError(
+ _("`compare_columns` must have the same length as `source_columns`.")
+ )
+ if compare_type not in tuple(PandasPostprocessingCompare):
+ raise QueryObjectValidationError(
+ _("`compare_type` must be `difference`, `percentage` or `ratio`")
+ )
+ if len(source_columns) == 0:
+ return df
+
+ for s_col, c_col in zip(source_columns, compare_columns):
+ if compare_type == PandasPostprocessingCompare.DIFF:
+ diff_series = df[s_col] - df[c_col]
+ elif compare_type == PandasPostprocessingCompare.PCT:
+ diff_series = (
+ ((df[s_col] - df[c_col]) / df[c_col]).astype(float).round(precision)
+ )
+ else:
+ # compare_type == "ratio"
+ diff_series = (df[s_col] / df[c_col]).astype(float).round(precision)
+ diff_df = diff_series.to_frame(
+ name=TIME_COMPARISION.join([compare_type, s_col, c_col])
+ )
+ df = pd.concat([df, diff_df], axis=1)
+
+ if drop_original_columns:
+ df = df.drop(source_columns + compare_columns, axis=1)
+ return df
diff --git a/superset/utils/pandas_postprocessing/contribution.py b/superset/utils/pandas_postprocessing/contribution.py
new file mode 100644
index 0000000..d813961
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/contribution.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from decimal import Decimal
+from typing import List, Optional
+
+from flask_babel import gettext as _
+from pandas import DataFrame
+
+from superset.exceptions import QueryObjectValidationError
+from superset.utils.core import PostProcessingContributionOrientation
+from superset.utils.pandas_postprocessing.utils import validate_column_args
+
+
+@validate_column_args("columns")
+def contribution(
+ df: DataFrame,
+ orientation: Optional[
+ PostProcessingContributionOrientation
+ ] = PostProcessingContributionOrientation.COLUMN,
+ columns: Optional[List[str]] = None,
+ rename_columns: Optional[List[str]] = None,
+) -> DataFrame:
+ """
+ Calculate cell contibution to row/column total for numeric columns.
+ Non-numeric columns will be kept untouched.
+
+ If `columns` are specified, only calculate contributions on selected columns.
+
+ :param df: DataFrame containing all-numeric data (temporal column ignored)
+ :param columns: Columns to calculate values from.
+ :param rename_columns: The new labels for the calculated contribution columns.
+ The original columns will not be removed.
+ :param orientation: calculate by dividing cell with row/column total
+ :return: DataFrame with contributions.
+ """
+ contribution_df = df.copy()
+ numeric_df = contribution_df.select_dtypes(include=["number", Decimal])
+ # verify column selections
+ if columns:
+ numeric_columns = numeric_df.columns.tolist()
+ for col in columns:
+ if col not in numeric_columns:
+ raise QueryObjectValidationError(
+ _(
+ 'Column "%(column)s" is not numeric or does not '
+ "exists in the query results.",
+ column=col,
+ )
+ )
+ columns = columns or numeric_df.columns
+ rename_columns = rename_columns or columns
+ if len(rename_columns) != len(columns):
+ raise QueryObjectValidationError(
+ _("`rename_columns` must have the same length as `columns`.")
+ )
+ # limit to selected columns
+ numeric_df = numeric_df[columns]
+ axis = 0 if orientation == PostProcessingContributionOrientation.COLUMN else 1
+ numeric_df = numeric_df / numeric_df.values.sum(axis=axis, keepdims=True)
+ contribution_df[rename_columns] = numeric_df
+ return contribution_df
diff --git a/superset/utils/pandas_postprocessing/cum.py b/superset/utils/pandas_postprocessing/cum.py
new file mode 100644
index 0000000..c142b36
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/cum.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional
+
+from flask_babel import gettext as _
+from pandas import DataFrame
+
+from superset.exceptions import QueryObjectValidationError
+from superset.utils.pandas_postprocessing.utils import (
+ _append_columns,
+ _flatten_column_after_pivot,
+ ALLOWLIST_CUMULATIVE_FUNCTIONS,
+ validate_column_args,
+)
+
+
+@validate_column_args("columns")
+def cum(
+ df: DataFrame,
+ operator: str,
+ columns: Optional[Dict[str, str]] = None,
+ is_pivot_df: bool = False,
+) -> DataFrame:
+ """
+ Calculate cumulative sum/product/min/max for select columns.
+
+ :param df: DataFrame on which the cumulative operation will be based.
+ :param columns: columns on which to perform a cumulative operation, mapping source
+ column to target column. For instance, `{'y': 'y'}` will replace the column
+ `y` with the cumulative value in `y`, while `{'y': 'y2'}` will add a column
+ `y2` based on cumulative values calculated from `y`, leaving the original
+ column `y` unchanged.
+ :param operator: cumulative operator, e.g. `sum`, `prod`, `min`, `max`
+ :param is_pivot_df: Dataframe is pivoted or not
+ :return: DataFrame with cumulated columns
+ """
+ columns = columns or {}
+ if is_pivot_df:
+ df_cum = df
+ else:
+ df_cum = df[columns.keys()]
+ operation = "cum" + operator
+ if operation not in ALLOWLIST_CUMULATIVE_FUNCTIONS or not hasattr(
+ df_cum, operation
+ ):
+ raise QueryObjectValidationError(
+ _("Invalid cumulative operator: %(operator)s", operator=operator)
+ )
+ if is_pivot_df:
+ df_cum = getattr(df_cum, operation)()
+ agg_in_pivot_df = df.columns.get_level_values(0).drop_duplicates().to_list()
+ agg: Dict[str, Dict[str, Any]] = {col: {} for col in agg_in_pivot_df}
+ df_cum.columns = [
+ _flatten_column_after_pivot(col, agg) for col in df_cum.columns
+ ]
+ df_cum.reset_index(level=0, inplace=True)
+ else:
+ df_cum = _append_columns(df, getattr(df_cum, operation)(), columns)
+ return df_cum
diff --git a/superset/utils/pandas_postprocessing/diff.py b/superset/utils/pandas_postprocessing/diff.py
new file mode 100644
index 0000000..fd7c83b
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/diff.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Dict
+
+from pandas import DataFrame
+
+from superset.constants import PandasAxis
+from superset.utils.pandas_postprocessing.utils import (
+ _append_columns,
+ validate_column_args,
+)
+
+
+@validate_column_args("columns")
+def diff(
+ df: DataFrame,
+ columns: Dict[str, str],
+ periods: int = 1,
+ axis: PandasAxis = PandasAxis.ROW,
+) -> DataFrame:
+ """
+ Calculate row-by-row or column-by-column difference for select columns.
+
+ :param df: DataFrame on which the diff will be based.
+ :param columns: columns on which to perform diff, mapping source column to
+ target column. For instance, `{'y': 'y'}` will replace the column `y` with
+ the diff value in `y`, while `{'y': 'y2'}` will add a column `y2` based
+ on diff values calculated from `y`, leaving the original column `y`
+ unchanged.
+ :param periods: periods to shift for calculating difference.
+ :param axis: 0 for row, 1 for column. default 0.
+ :return: DataFrame with diffed columns
+ :raises QueryObjectValidationError: If the request in incorrect
+ """
+ df_diff = df[columns.keys()]
+ df_diff = df_diff.diff(periods=periods, axis=axis)
+ return _append_columns(df, df_diff, columns)
diff --git a/superset/utils/pandas_postprocessing/geography.py b/superset/utils/pandas_postprocessing/geography.py
new file mode 100644
index 0000000..a1aae59
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/geography.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional, Tuple
+
+import geohash as geohash_lib
+from flask_babel import gettext as _
+from geopy.point import Point
+from pandas import DataFrame
+
+from superset.exceptions import QueryObjectValidationError
+from superset.utils.pandas_postprocessing.utils import _append_columns
+
+
+def geohash_decode(
+ df: DataFrame, geohash: str, longitude: str, latitude: str
+) -> DataFrame:
+ """
+ Decode a geohash column into longitude and latitude
+
+ :param df: DataFrame containing geohash data
+ :param geohash: Name of source column containing geohash location.
+ :param longitude: Name of new column to be created containing longitude.
+ :param latitude: Name of new column to be created containing latitude.
+ :return: DataFrame with decoded longitudes and latitudes
+ """
+ try:
+ lonlat_df = DataFrame()
+ lonlat_df["latitude"], lonlat_df["longitude"] = zip(
+ *df[geohash].apply(geohash_lib.decode)
+ )
+ return _append_columns(
+ df, lonlat_df, {"latitude": latitude, "longitude": longitude}
+ )
+ except ValueError as ex:
+ raise QueryObjectValidationError(_("Invalid geohash string")) from ex
+
+
+def geohash_encode(
+ df: DataFrame, geohash: str, longitude: str, latitude: str,
+) -> DataFrame:
+ """
+ Encode longitude and latitude into geohash
+
+ :param df: DataFrame containing longitude and latitude data
+ :param geohash: Name of new column to be created containing geohash location.
+ :param longitude: Name of source column containing longitude.
+ :param latitude: Name of source column containing latitude.
+ :return: DataFrame with decoded longitudes and latitudes
+ """
+ try:
+ encode_df = df[[latitude, longitude]]
+ encode_df.columns = ["latitude", "longitude"]
+ encode_df["geohash"] = encode_df.apply(
+ lambda row: geohash_lib.encode(row["latitude"], row["longitude"]), axis=1,
+ )
+ return _append_columns(df, encode_df, {"geohash": geohash})
+ except ValueError as ex:
+ raise QueryObjectValidationError(_("Invalid longitude/latitude")) from ex
+
+
+def geodetic_parse(
+ df: DataFrame,
+ geodetic: str,
+ longitude: str,
+ latitude: str,
+ altitude: Optional[str] = None,
+) -> DataFrame:
+ """
+ Parse a column containing a geodetic point string
+ [Geopy](https://geopy.readthedocs.io/en/stable/#geopy.point.Point).
+
+ :param df: DataFrame containing geodetic point data
+ :param geodetic: Name of source column containing geodetic point string.
+ :param longitude: Name of new column to be created containing longitude.
+ :param latitude: Name of new column to be created containing latitude.
+ :param altitude: Name of new column to be created containing altitude.
+ :return: DataFrame with decoded longitudes and latitudes
+ """
+
+ def _parse_location(location: str) -> Tuple[float, float, float]:
+ """
+ Parse a string containing a geodetic point and return latitude, longitude
+ and altitude
+ """
+ point = Point(location)
+ return point[0], point[1], point[2]
+
+ try:
+ geodetic_df = DataFrame()
+ (
+ geodetic_df["latitude"],
+ geodetic_df["longitude"],
+ geodetic_df["altitude"],
+ ) = zip(*df[geodetic].apply(_parse_location))
+ columns = {"latitude": latitude, "longitude": longitude}
+ if altitude:
+ columns["altitude"] = altitude
+ return _append_columns(df, geodetic_df, columns)
+ except ValueError as ex:
+ raise QueryObjectValidationError(_("Invalid geodetic string")) from ex
diff --git a/superset/utils/pandas_postprocessing/pivot.py b/superset/utils/pandas_postprocessing/pivot.py
new file mode 100644
index 0000000..b9d70e9
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/pivot.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, List, Optional
+
+from flask_babel import gettext as _
+from pandas import DataFrame
+
+from superset.constants import NULL_STRING, PandasAxis
+from superset.exceptions import QueryObjectValidationError
+from superset.utils.pandas_postprocessing.utils import (
+ _flatten_column_after_pivot,
+ _get_aggregate_funcs,
+ validate_column_args,
+)
+
+
+@validate_column_args("index", "columns")
+def pivot( # pylint: disable=too-many-arguments,too-many-locals
+ df: DataFrame,
+ index: List[str],
+ aggregates: Dict[str, Dict[str, Any]],
+ columns: Optional[List[str]] = None,
+ metric_fill_value: Optional[Any] = None,
+ column_fill_value: Optional[str] = NULL_STRING,
+ drop_missing_columns: Optional[bool] = True,
+ combine_value_with_metric: bool = False,
+ marginal_distributions: Optional[bool] = None,
+ marginal_distribution_name: Optional[str] = None,
+ flatten_columns: bool = True,
+ reset_index: bool = True,
+) -> DataFrame:
+ """
+ Perform a pivot operation on a DataFrame.
+
+ :param df: Object on which pivot operation will be performed
+ :param index: Columns to group by on the table index (=rows)
+ :param columns: Columns to group by on the table columns
+ :param metric_fill_value: Value to replace missing values with
+ :param column_fill_value: Value to replace missing pivot columns with. By default
+ replaces missing values with "<NULL>". Set to `None` to remove columns
+ with missing values.
+ :param drop_missing_columns: Do not include columns whose entries are all missing
+ :param combine_value_with_metric: Display metrics side by side within each column,
+ as opposed to each column being displayed side by side for each metric.
+ :param aggregates: A mapping from aggregate column name to the the aggregate
+ config.
+ :param marginal_distributions: Add totals for row/column. Default to False
+ :param marginal_distribution_name: Name of row/column with marginal distribution.
+ Default to 'All'.
+ :param flatten_columns: Convert column names to strings
+ :param reset_index: Convert index to column
+ :return: A pivot table
+ :raises QueryObjectValidationError: If the request in incorrect
+ """
+ if not index:
+ raise QueryObjectValidationError(
+ _("Pivot operation requires at least one index")
+ )
+ if not aggregates:
+ raise QueryObjectValidationError(
+ _("Pivot operation must include at least one aggregate")
+ )
+
+ if columns and column_fill_value:
+ df[columns] = df[columns].fillna(value=column_fill_value)
+
+ aggregate_funcs = _get_aggregate_funcs(df, aggregates)
+
+ # TODO (villebro): Pandas 1.0.3 doesn't yet support NamedAgg in pivot_table.
+ # Remove once/if support is added.
+ aggfunc = {na.column: na.aggfunc for na in aggregate_funcs.values()}
+
+ # When dropna = False, the pivot_table function will calculate cartesian-product
+ # for MultiIndex.
+ # https://github.com/apache/superset/issues/15956
+ # https://github.com/pandas-dev/pandas/issues/18030
+ series_set = set()
+ if not drop_missing_columns and columns:
+ for row in df[columns].itertuples():
+ for metric in aggfunc.keys():
+ series_set.add(str(tuple([metric]) + tuple(row[1:])))
+
+ df = df.pivot_table(
+ values=aggfunc.keys(),
+ index=index,
+ columns=columns,
+ aggfunc=aggfunc,
+ fill_value=metric_fill_value,
+ dropna=drop_missing_columns,
+ margins=marginal_distributions,
+ margins_name=marginal_distribution_name,
+ )
+
+ if not drop_missing_columns and len(series_set) > 0 and not df.empty:
+ for col in df.columns:
+ series = str(col)
+ if series not in series_set:
+ df = df.drop(col, axis=PandasAxis.COLUMN)
+
+ if combine_value_with_metric:
+ df = df.stack(0).unstack()
+
+ # Make index regular column
+ if flatten_columns:
+ df.columns = [
+ _flatten_column_after_pivot(col, aggregates) for col in df.columns
+ ]
+ # return index as regular column
+ if reset_index:
+ df.reset_index(level=0, inplace=True)
+ return df
diff --git a/superset/utils/pandas_postprocessing/prophet.py b/superset/utils/pandas_postprocessing/prophet.py
new file mode 100644
index 0000000..3ade7f6
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/prophet.py
@@ -0,0 +1,157 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import Optional, Union
+
+from flask_babel import gettext as _
+from pandas import DataFrame
+
+from superset.exceptions import QueryObjectValidationError
+from superset.utils.core import DTTM_ALIAS
+from superset.utils.pandas_postprocessing.utils import PROPHET_TIME_GRAIN_MAP
+
+
+def _prophet_parse_seasonality(
+ input_value: Optional[Union[bool, int]]
+) -> Union[bool, str, int]:
+ if input_value is None:
+ return "auto"
+ if isinstance(input_value, bool):
+ return input_value
+ try:
+ return int(input_value)
+ except ValueError:
+ return input_value
+
+
+def _prophet_fit_and_predict( # pylint: disable=too-many-arguments
+ df: DataFrame,
+ confidence_interval: float,
+ yearly_seasonality: Union[bool, str, int],
+ weekly_seasonality: Union[bool, str, int],
+ daily_seasonality: Union[bool, str, int],
+ periods: int,
+ freq: str,
+) -> DataFrame:
+ """
+ Fit a prophet model and return a DataFrame with predicted results.
+ """
+ try:
+ # pylint: disable=import-error,import-outside-toplevel
+ from prophet import Prophet
+
+ prophet_logger = logging.getLogger("prophet.plot")
+ prophet_logger.setLevel(logging.CRITICAL)
+ prophet_logger.setLevel(logging.NOTSET)
+ except ModuleNotFoundError as ex:
+ raise QueryObjectValidationError(_("`prophet` package not installed")) from ex
+ model = Prophet(
+ interval_width=confidence_interval,
+ yearly_seasonality=yearly_seasonality,
+ weekly_seasonality=weekly_seasonality,
+ daily_seasonality=daily_seasonality,
+ )
+ if df["ds"].dt.tz:
+ df["ds"] = df["ds"].dt.tz_convert(None)
+ model.fit(df)
+ future = model.make_future_dataframe(periods=periods, freq=freq)
+ forecast = model.predict(future)[["ds", "yhat", "yhat_lower", "yhat_upper"]]
+ return forecast.join(df.set_index("ds"), on="ds").set_index(["ds"])
+
+
+def prophet( # pylint: disable=too-many-arguments
+ df: DataFrame,
+ time_grain: str,
+ periods: int,
+ confidence_interval: float,
+ yearly_seasonality: Optional[Union[bool, int]] = None,
+ weekly_seasonality: Optional[Union[bool, int]] = None,
+ daily_seasonality: Optional[Union[bool, int]] = None,
+ index: Optional[str] = None,
+) -> DataFrame:
+ """
+ Add forecasts to each series in a timeseries dataframe, along with confidence
+ intervals for the prediction. For each series, the operation creates three
+ new columns with the column name suffixed with the following values:
+
+ - `__yhat`: the forecast for the given date
+ - `__yhat_lower`: the lower bound of the forecast for the given date
+ - `__yhat_upper`: the upper bound of the forecast for the given date
+
+
+ :param df: DataFrame containing all-numeric data (temporal column ignored)
+ :param time_grain: Time grain used to specify time period increments in prediction
+ :param periods: Time periods (in units of `time_grain`) to predict into the future
+ :param confidence_interval: Width of predicted confidence interval
+ :param yearly_seasonality: Should yearly seasonality be applied.
+ An integer value will specify Fourier order of seasonality.
+ :param weekly_seasonality: Should weekly seasonality be applied.
+ An integer value will specify Fourier order of seasonality, `None` will
+ automatically detect seasonality.
+ :param daily_seasonality: Should daily seasonality be applied.
+ An integer value will specify Fourier order of seasonality, `None` will
+ automatically detect seasonality.
+ :param index: the name of the column containing the x-axis data
+ :return: DataFrame with contributions, with temporal column at beginning if present
+ """
+ index = index or DTTM_ALIAS
+ # validate inputs
+ if not time_grain:
+ raise QueryObjectValidationError(_("Time grain missing"))
+ if time_grain not in PROPHET_TIME_GRAIN_MAP:
+ raise QueryObjectValidationError(
+ _("Unsupported time grain: %(time_grain)s", time_grain=time_grain,)
+ )
+ freq = PROPHET_TIME_GRAIN_MAP[time_grain]
+ # check type at runtime due to marhsmallow schema not being able to handle
+ # union types
+ if not isinstance(periods, int) or periods < 0:
+ raise QueryObjectValidationError(_("Periods must be a whole number"))
+ if not confidence_interval or confidence_interval <= 0 or confidence_interval >= 1:
+ raise QueryObjectValidationError(
+ _("Confidence interval must be between 0 and 1 (exclusive)")
+ )
+ if index not in df.columns:
+ raise QueryObjectValidationError(_("DataFrame must include temporal column"))
+ if len(df.columns) < 2:
+ raise QueryObjectValidationError(_("DataFrame include at least one series"))
+
+ target_df = DataFrame()
+ for column in [column for column in df.columns if column != index]:
+ fit_df = _prophet_fit_and_predict(
+ df=df[[index, column]].rename(columns={index: "ds", column: "y"}),
+ confidence_interval=confidence_interval,
+ yearly_seasonality=_prophet_parse_seasonality(yearly_seasonality),
+ weekly_seasonality=_prophet_parse_seasonality(weekly_seasonality),
+ daily_seasonality=_prophet_parse_seasonality(daily_seasonality),
+ periods=periods,
+ freq=freq,
+ )
+ new_columns = [
+ f"{column}__yhat",
+ f"{column}__yhat_lower",
+ f"{column}__yhat_upper",
+ f"{column}",
+ ]
+ fit_df.columns = new_columns
+ if target_df.empty:
+ target_df = fit_df
+ else:
+ for new_column in new_columns:
+ target_df = target_df.assign(**{new_column: fit_df[new_column]})
+ target_df.reset_index(level=0, inplace=True)
+ return target_df.rename(columns={"ds": index})
diff --git a/superset/utils/pandas_postprocessing/resample.py b/superset/utils/pandas_postprocessing/resample.py
new file mode 100644
index 0000000..54e67ac
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/resample.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional, Tuple, Union
+
+from pandas import DataFrame
+
+from superset.utils.pandas_postprocessing.utils import validate_column_args
+
+
+@validate_column_args("groupby_columns")
+def resample( # pylint: disable=too-many-arguments
+ df: DataFrame,
+ rule: str,
+ method: str,
+ time_column: str,
+ groupby_columns: Optional[Tuple[Optional[str], ...]] = None,
+ fill_value: Optional[Union[float, int]] = None,
+) -> DataFrame:
+ """
+ support upsampling in resample
+
+ :param df: DataFrame to resample.
+ :param rule: The offset string representing target conversion.
+ :param method: How to fill the NaN value after resample.
+ :param time_column: existing columns in DataFrame.
+ :param groupby_columns: columns except time_column in dataframe
+ :param fill_value: What values do fill missing.
+ :return: DataFrame after resample
+ :raises QueryObjectValidationError: If the request in incorrect
+ """
+
+ def _upsampling(_df: DataFrame) -> DataFrame:
+ _df = _df.set_index(time_column)
+ if method == "asfreq" and fill_value is not None:
+ return _df.resample(rule).asfreq(fill_value=fill_value)
+ return getattr(_df.resample(rule), method)()
+
+ if groupby_columns:
+ df = (
+ df.set_index(keys=list(groupby_columns))
+ .groupby(by=list(groupby_columns))
+ .apply(_upsampling)
+ )
+ df = df.reset_index().set_index(time_column).sort_index()
+ else:
+ df = _upsampling(df)
+ return df.reset_index()
diff --git a/superset/utils/pandas_postprocessing/rolling.py b/superset/utils/pandas_postprocessing/rolling.py
new file mode 100644
index 0000000..f93b3da
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/rolling.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from flask_babel import gettext as _
+from pandas import DataFrame
+
+from superset.exceptions import QueryObjectValidationError
+from superset.utils.pandas_postprocessing.utils import (
+ _append_columns,
+ _flatten_column_after_pivot,
+ DENYLIST_ROLLING_FUNCTIONS,
+ validate_column_args,
+)
+
+
+@validate_column_args("columns")
+def rolling( # pylint: disable=too-many-arguments
+ df: DataFrame,
+ rolling_type: str,
+ columns: Optional[Dict[str, str]] = None,
+ window: Optional[int] = None,
+ rolling_type_options: Optional[Dict[str, Any]] = None,
+ center: bool = False,
+ win_type: Optional[str] = None,
+ min_periods: Optional[int] = None,
+ is_pivot_df: bool = False,
+) -> DataFrame:
+ """
+ Apply a rolling window on the dataset. See the Pandas docs for further details:
+ https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.rolling.html
+
+ :param df: DataFrame on which the rolling period will be based.
+ :param columns: columns on which to perform rolling, mapping source column to
+ target column. For instance, `{'y': 'y'}` will replace the column `y` with
+ the rolling value in `y`, while `{'y': 'y2'}` will add a column `y2` based
+ on rolling values calculated from `y`, leaving the original column `y`
+ unchanged.
+ :param rolling_type: Type of rolling window. Any numpy function will work.
+ :param window: Size of the window.
+ :param rolling_type_options: Optional options to pass to rolling method. Needed
+ for e.g. quantile operation.
+ :param center: Should the label be at the center of the window.
+ :param win_type: Type of window function.
+ :param min_periods: The minimum amount of periods required for a row to be included
+ in the result set.
+ :param is_pivot_df: Dataframe is pivoted or not
+ :return: DataFrame with the rolling columns
+ :raises QueryObjectValidationError: If the request in incorrect
+ """
+ rolling_type_options = rolling_type_options or {}
+ columns = columns or {}
+ if is_pivot_df:
+ df_rolling = df
+ else:
+ df_rolling = df[columns.keys()]
+ kwargs: Dict[str, Union[str, int]] = {}
+ if window is None:
+ raise QueryObjectValidationError(_("Undefined window for rolling operation"))
+ if window == 0:
+ raise QueryObjectValidationError(_("Window must be > 0"))
+
+ kwargs["window"] = window
+ if min_periods is not None:
+ kwargs["min_periods"] = min_periods
+ if center is not None:
+ kwargs["center"] = center
+ if win_type is not None:
+ kwargs["win_type"] = win_type
+
+ df_rolling = df_rolling.rolling(**kwargs)
+ if rolling_type not in DENYLIST_ROLLING_FUNCTIONS or not hasattr(
+ df_rolling, rolling_type
+ ):
+ raise QueryObjectValidationError(
+ _("Invalid rolling_type: %(type)s", type=rolling_type)
+ )
+ try:
+ df_rolling = getattr(df_rolling, rolling_type)(**rolling_type_options)
+ except TypeError as ex:
+ raise QueryObjectValidationError(
+ _(
+ "Invalid options for %(rolling_type)s: %(options)s",
+ rolling_type=rolling_type,
+ options=rolling_type_options,
+ )
+ ) from ex
+
+ if is_pivot_df:
+ agg_in_pivot_df = df.columns.get_level_values(0).drop_duplicates().to_list()
+ agg: Dict[str, Dict[str, Any]] = {col: {} for col in agg_in_pivot_df}
+ df_rolling.columns = [
+ _flatten_column_after_pivot(col, agg) for col in df_rolling.columns
+ ]
+ df_rolling.reset_index(level=0, inplace=True)
+ else:
+ df_rolling = _append_columns(df, df_rolling, columns)
+
+ if min_periods:
+ df_rolling = df_rolling[min_periods:]
+ return df_rolling
diff --git a/superset/utils/pandas_postprocessing/select.py b/superset/utils/pandas_postprocessing/select.py
new file mode 100644
index 0000000..209d502
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/select.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Dict, List, Optional
+
+from pandas import DataFrame
+
+from superset.utils.pandas_postprocessing.utils import validate_column_args
+
+
+@validate_column_args("columns", "drop", "rename")
+def select(
+ df: DataFrame,
+ columns: Optional[List[str]] = None,
+ exclude: Optional[List[str]] = None,
+ rename: Optional[Dict[str, str]] = None,
+) -> DataFrame:
+ """
+ Only select a subset of columns in the original dataset. Can be useful for
+ removing unnecessary intermediate results, renaming and reordering columns.
+
+ :param df: DataFrame on which the rolling period will be based.
+ :param columns: Columns which to select from the DataFrame, in the desired order.
+ If left undefined, all columns will be selected. If columns are
+ renamed, the original column name should be referenced here.
+ :param exclude: columns to exclude from selection. If columns are renamed, the new
+ column name should be referenced here.
+ :param rename: columns which to rename, mapping source column to target column.
+ For instance, `{'y': 'y2'}` will rename the column `y` to
+ `y2`.
+ :return: Subset of columns in original DataFrame
+ :raises QueryObjectValidationError: If the request in incorrect
+ """
+ df_select = df.copy(deep=False)
+ if columns:
+ df_select = df_select[columns]
+ if exclude:
+ df_select = df_select.drop(exclude, axis=1)
+ if rename is not None:
+ df_select = df_select.rename(columns=rename)
+ return df_select
diff --git a/superset/utils/pandas_postprocessing/sort.py b/superset/utils/pandas_postprocessing/sort.py
new file mode 100644
index 0000000..fdf8f94
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/sort.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Dict
+
+from pandas import DataFrame
+
+from superset.utils.pandas_postprocessing.utils import validate_column_args
+
+
+@validate_column_args("columns")
+def sort(df: DataFrame, columns: Dict[str, bool]) -> DataFrame:
+ """
+ Sort a DataFrame.
+
+ :param df: DataFrame to sort.
+ :param columns: columns by by which to sort. The key specifies the column name,
+ value specifies if sorting in ascending order.
+ :return: Sorted DataFrame
+ :raises QueryObjectValidationError: If the request in incorrect
+ """
+ return df.sort_values(by=list(columns.keys()), ascending=list(columns.values()))
diff --git a/superset/utils/pandas_postprocessing/utils.py b/superset/utils/pandas_postprocessing/utils.py
new file mode 100644
index 0000000..7d26994
--- /dev/null
+++ b/superset/utils/pandas_postprocessing/utils.py
@@ -0,0 +1,201 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import partial
+from typing import Any, Callable, Dict, Tuple, Union
+
+import numpy as np
+from flask_babel import gettext as _
+from pandas import DataFrame, NamedAgg, Timestamp
+
+from superset.exceptions import QueryObjectValidationError
+
+NUMPY_FUNCTIONS = {
+ "average": np.average,
+ "argmin": np.argmin,
+ "argmax": np.argmax,
+ "count": np.ma.count,
+ "count_nonzero": np.count_nonzero,
+ "cumsum": np.cumsum,
+ "cumprod": np.cumprod,
+ "max": np.max,
+ "mean": np.mean,
+ "median": np.median,
+ "nansum": np.nansum,
+ "nanmin": np.nanmin,
+ "nanmax": np.nanmax,
+ "nanmean": np.nanmean,
+ "nanmedian": np.nanmedian,
+ "nanpercentile": np.nanpercentile,
+ "min": np.min,
+ "percentile": np.percentile,
+ "prod": np.prod,
+ "product": np.product,
+ "std": np.std,
+ "sum": np.sum,
+ "var": np.var,
+}
+
+DENYLIST_ROLLING_FUNCTIONS = (
+ "count",
+ "corr",
+ "cov",
+ "kurt",
+ "max",
+ "mean",
+ "median",
+ "min",
+ "std",
+ "skew",
+ "sum",
+ "var",
+ "quantile",
+)
+
+ALLOWLIST_CUMULATIVE_FUNCTIONS = (
+ "cummax",
+ "cummin",
+ "cumprod",
+ "cumsum",
+)
+
+PROPHET_TIME_GRAIN_MAP = {
+ "PT1S": "S",
+ "PT1M": "min",
+ "PT5M": "5min",
+ "PT10M": "10min",
+ "PT15M": "15min",
+ "PT30M": "30min",
+ "PT1H": "H",
+ "P1D": "D",
+ "P1W": "W",
+ "P1M": "M",
+ "P3M": "Q",
+ "P1Y": "A",
+ "1969-12-28T00:00:00Z/P1W": "W",
+ "1969-12-29T00:00:00Z/P1W": "W",
+ "P1W/1970-01-03T00:00:00Z": "W",
+ "P1W/1970-01-04T00:00:00Z": "W",
+}
+
+
+def _flatten_column_after_pivot(
+ column: Union[float, Timestamp, str, Tuple[str, ...]],
+ aggregates: Dict[str, Dict[str, Any]],
+) -> str:
+ """
+ Function for flattening column names into a single string. This step is necessary
+ to be able to properly serialize a DataFrame. If the column is a string, return
+ element unchanged. For multi-element columns, join column elements with a comma,
+ with the exception of pivots made with a single aggregate, in which case the
+ aggregate column name is omitted.
+
+ :param column: single element from `DataFrame.columns`
+ :param aggregates: aggregates
+ :return:
+ """
+ if not isinstance(column, tuple):
+ column = (column,)
+ if len(aggregates) == 1 and len(column) > 1:
+ # drop aggregate for single aggregate pivots with multiple groupings
+ # from column name (aggregates always come first in column name)
+ column = column[1:]
+ return ", ".join([str(col) for col in column])
+
+
+def validate_column_args(*argnames: str) -> Callable[..., Any]:
+ def wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
+ def wrapped(df: DataFrame, **options: Any) -> Any:
+ if options.get("is_pivot_df"):
+ # skip validation when pivot Dataframe
+ return func(df, **options)
+ columns = df.columns.tolist()
+ for name in argnames:
+ if name in options and not all(
+ elem in columns for elem in options.get(name) or []
+ ):
+ raise QueryObjectValidationError(
+ _("Referenced columns not available in DataFrame.")
+ )
+ return func(df, **options)
+
+ return wrapped
+
+ return wrapper
+
+
+def _get_aggregate_funcs(
+ df: DataFrame, aggregates: Dict[str, Dict[str, Any]],
+) -> Dict[str, NamedAgg]:
+ """
+ Converts a set of aggregate config objects into functions that pandas can use as
+ aggregators. Currently only numpy aggregators are supported.
+
+ :param df: DataFrame on which to perform aggregate operation.
+ :param aggregates: Mapping from column name to aggregate config.
+ :return: Mapping from metric name to function that takes a single input argument.
+ """
+ agg_funcs: Dict[str, NamedAgg] = {}
+ for name, agg_obj in aggregates.items():
+ column = agg_obj.get("column", name)
+ if column not in df:
+ raise QueryObjectValidationError(
+ _(
+ "Column referenced by aggregate is undefined: %(column)s",
+ column=column,
+ )
+ )
+ if "operator" not in agg_obj:
+ raise QueryObjectValidationError(
+ _("Operator undefined for aggregator: %(name)s", name=name,)
+ )
+ operator = agg_obj["operator"]
+ if callable(operator):
+ aggfunc = operator
+ else:
+ func = NUMPY_FUNCTIONS.get(operator)
+ if not func:
+ raise QueryObjectValidationError(
+ _("Invalid numpy function: %(operator)s", operator=operator,)
+ )
+ options = agg_obj.get("options", {})
+ aggfunc = partial(func, **options)
+ agg_funcs[name] = NamedAgg(column=column, aggfunc=aggfunc)
+
+ return agg_funcs
+
+
+def _append_columns(
+ base_df: DataFrame, append_df: DataFrame, columns: Dict[str, str]
+) -> DataFrame:
+ """
+ Function for adding columns from one DataFrame to another DataFrame. Calls the
+ assign method, which overwrites the original column in `base_df` if the column
+ already exists, and appends the column if the name is not defined.
+
+ :param base_df: DataFrame which to use as the base
+ :param append_df: DataFrame from which to select data.
+ :param columns: columns on which to append, mapping source column to
+ target column. For instance, `{'y': 'y'}` will replace the values in
+ column `y` in `base_df` with the values in `y` in `append_df`,
+ while `{'y': 'y2'}` will add a column `y2` to `base_df` based
+ on values in column `y` in `append_df`, leaving the original column `y`
+ in `base_df` unchanged.
+ :return: new DataFrame with combined data from `base_df` and `append_df`
+ """
+ return base_df.assign(
+ **{target: append_df[source] for source, target in columns.items()}
+ )