You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@superset.apache.org by gi...@git.apache.org on 2017/09/20 10:26:14 UTC

[GitHub] rhunwicks commented on a change in pull request #3492: PandasConnector

rhunwicks commented on a change in pull request #3492: PandasConnector
URL: https://github.com/apache/incubator-superset/pull/3492#discussion_r139931452
 
 

 ##########
 File path: contrib/connectors/pandas/models.py
 ##########
 @@ -0,0 +1,724 @@
+from collections import OrderedDict
+from datetime import datetime
+import logging
+from past.builtins import basestring
+try:
+    from urllib.parse import urlparse
+except ImportError:
+    from urlparse import urlparse
+
+import pandas as pd
+from pandas.api.types import (
+    is_string_dtype, is_numeric_dtype, is_datetime64_any_dtype)
+
+from sqlalchemy import (
+    Column, Integer, String, ForeignKey, Text
+)
+import sqlalchemy as sa
+from sqlalchemy.orm import backref, relationship
+from sqlalchemy_utils import ChoiceType, JSONType
+
+from flask import escape, Markup
+from flask_appbuilder import Model
+from flask_babel import lazy_gettext as _
+
+from superset import db, utils, sm
+from superset.connectors.base.models import (
+    BaseDatasource, BaseColumn, BaseMetric)
+from superset.models.helpers import QueryResult, set_perm
+from superset.utils import QueryStatus
+
+
+class PandasDatabase(object):
+    """Non-ORM object for a Pandas Source"""
+    database_name = ''
+
+    cache_timeout = None
+
+    def __init__(self, database_name, cache_timeout):
+        self.database_name = database_name
+        self.cache_timeout = cache_timeout
+
+    def __str__(self):
+        return self.database_name
+
+
+class PandasColumn(Model, BaseColumn):
+    """
+    ORM object for Pandas columns.
+
+    Each Pandas Datasource can have multiple columns"""
+
+    __tablename__ = 'pandascolumns'
+
+    id = Column(Integer, primary_key=True)
+    pandasdatasource_id = Column(Integer, ForeignKey('pandasdatasources.id'))
+    datasource = relationship(
+        'PandasDatasource',
+        backref=backref('columns', cascade='all, delete-orphan'),
+        foreign_keys=[pandasdatasource_id])
+
+    @property
+    def is_num(self):
+        return self.type and is_numeric_dtype(self.type)
+
+    @property
+    def is_time(self):
+        return self.type and is_datetime64_any_dtype(self.type)
+
+    @property
+    def is_dttm(self):
+        return self.is_time
+
+    @property
+    def is_string(self):
+        return self.type and is_string_dtype(self.type)
+
+    num_types = (
+        'DOUBLE', 'FLOAT', 'INT', 'BIGINT',
+        'LONG', 'REAL', 'NUMERIC', 'DECIMAL'
+    )
+    date_types = ('DATE', 'TIME', 'DATETIME')
+    str_types = ('VARCHAR', 'STRING', 'CHAR')
+
+    @property
+    def expression(self):
+        return ''
+
+    @property
+    def data(self):
+        attrs = (
+            'column_name', 'verbose_name', 'description', 'expression',
+            'filterable', 'groupby')
+        return {s: getattr(self, s) for s in attrs}
+
+
+class PandasMetric(Model, BaseMetric):
+    """
+    ORM object for Pandas metrics.
+
+    Each Pandas Datasource can have multiple metrics
+    """
+
+    __tablename__ = 'pandasmetrics'
+
+    id = Column(Integer, primary_key=True)
+    pandasdatasource_id = Column(Integer, ForeignKey('pandasdatasources.id'))
+    datasource = relationship(
+        'PandasDatasource',
+        backref=backref('metrics', cascade='all, delete-orphan'),
+        foreign_keys=[pandasdatasource_id])
+    source = Column(Text)
+    expression = Column(Text)
+
+    @property
+    def perm(self):
+        if self.datasource:
+            return ('{parent_name}.[{obj.metric_name}]'
+                    '(id:{obj.id})').format(
+                obj=self,
+                parent_name=self.datasource.full_name)
+        return None
+
+
+class PandasDatasource(Model, BaseDatasource):
+    """A datasource based on a Pandas DataFrame"""
+
+    FORMATS = [
+        ('csv', 'CSV'),
+        ('html', 'HTML')
+    ]
+
+    # See http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases # NOQA
+    GRAINS = OrderedDict([
+        ('5 seconds', '5S'),
+        ('30 seconds', '30S'),
+        ('1 minute', 'T'),
+        ('5 minutes', '5T'),
+        ('1 hour', 'H'),
+        ('6 hour', '6H'),
+        ('day', 'D'),
+        ('one day', 'D'),
+        ('1 day', 'D'),
+        ('7 days', '7D'),
+        ('week', 'W-MON'),
+        ('week_starting_sunday', 'W-SUN'),
+        ('week_ending_saturday', 'W-SUN'),
+        ('month', 'M'),
+        ('quarter', 'Q'),
+        ('year', 'A'),
+    ])
+
+    __tablename__ = 'pandasdatasources'
+    type = 'pandas'
+    baselink = 'pandasdatasourcemodelview'  # url portion pointing to ModelView endpoint
+    column_class = PandasColumn
+    metric_class = PandasMetric
+
+    name = Column(String(100), nullable=False)
+    source_url = Column(String(1000), nullable=False)
+    format = Column(String(20), nullable=False)
+    additional_parameters = Column(JSONType)
+
+    user_id = Column(Integer, ForeignKey('ab_user.id'))
+    owner = relationship(
+        sm.user_model,
+        backref='pandasdatasources',
+        foreign_keys=[user_id])
+
+    fetch_values_predicate = Column(String(1000))
+    main_dttm_col = Column(String(250))
+
+    # Used to do code highlighting when displaying the query in the UI
+    query_language = None
+
+    # A Pandas Dataframe containing the data retrieved from the source url
+    df = None
+
+    def __repr__(self):
+        return self.name
+
+    @property
+    def datasource_name(self):
+        return self.name
+
+    @property
+    def full_name(self):
+        return self.name
+
+    @property
+    def database(self):
+        uri = urlparse(self.source_url)
+        return PandasDatabase(database_name=uri.netloc,
+                              cache_timeout=None)
+
+    @property
+    def connection(self):
+        return self.source_url
+
+    @property
+    def schema(self):
+        uri = urlparse(self.source_url)
+        return uri.path
+
+    @property
+    def schema_perm(self):
+        """Returns endpoint permission if present, host one otherwise."""
+        return utils.get_schema_perm(self.database, self.schema)
+
+    @property
+    def description_markeddown(self):
+        return utils.markdown(self.description)
+
+    @property
+    def link(self):
+        name = escape(self.name)
+        return Markup(
+            '<a href="{self.explore_url}">{name}</a>'.format(**locals()))
+
+    def get_perm(self):
+        return (
+            "pandas.{obj.name}"
+            "(id:{obj.id})").format(obj=self)
+
+    @property
+    def dttm_cols(self):
+        l = [c.column_name for c in self.columns if c.is_dttm]
+        if self.main_dttm_col and self.main_dttm_col not in l:
+            l.append(self.main_dttm_col)
+        return l
+
+    @property
+    def num_cols(self):
+        return [c.column_name for c in self.columns if c.is_num]
+
+    @property
+    def any_dttm_col(self):
+        cols = self.dttm_cols
+        if cols:
+            return cols[0]
+
+    @property
+    def html(self):
+        t = ((c.column_name, c.type) for c in self.columns)
+        df = pd.DataFrame(t)
+        df.columns = ['field', 'type']
+        return df.to_html(
+            index=False,
+            classes=(
+                "dataframe table table-striped table-bordered "
+                "table-condensed"))
+
+    @property
+    def data(self):
+        d = super(PandasDatasource, self).data
+        d['granularity_sqla'] = utils.choicify(self.dttm_cols)
+        d['time_grain_sqla'] = [(g, g) for g in self.GRAINS.keys()]
+        logging.info(d)
+        print(d)
+        return d
+
+    @property
+    def pandas_read_method(self):
+        return getattr(pd, 'read_{obj.format}'.format(obj=self))
+
+    @property
+    def pandas_read_parameters(self):
+        return self.additional_parameters or {}
+
+    def get_empty_dataframe(self):
+        """Create an empty dataframe with the correct columns and dtypes"""
+        columns = []
+        for col in self.columns:
+            type = ('datetime64[ns]'
+                    if is_datetime64_any_dtype(col.type)
+                    else col.type)
+            columns.append((col.column_name, type))
+        return pd.DataFrame({k: pd.Series(dtype=t) for k, t in columns})
+
+    def get_dataframe(self):
+        if self.df is None:
+            self.df = self.pandas_read_method(self.source_url,
+                                              **self.pandas_read_parameters)
+            # read_html returns a list of DataFrames
+            if (isinstance(self.df, list) and
+                    isinstance(self.df[0], pd.DataFrame)):
+                self.df = self.df[0]
+        for col in self.columns:
+            name = col.column_name
+            type = col.type
+            if type != self.df[name].dtype.name:
+                try:
+                    self.df[name] = self.df[name].values.astype(type)
+                except ValueError as e:
+                    message = ('Failed to convert column {name} '
+                               'from {old_type} to {new_type}').format(
+                        name=name,
+                        old_type=self.df[name].dtype.name,
+                        new_type=type)
+                    e.args = (message,) + e.args
+                    raise
+        return self.df
+
+    def get_filter_query(self, filter):
+        """
+        Build a query string to filter a dataframe.
+
+        Filter is a list of dicts of op, col and value.
+
+        Returns a string that can be passed to DataFrame.query() to
+        restrict the DataFrame to only the matching rows.
+        """
+        cols = {col.column_name: col for col in self.columns}
+        query = ''
+        for flt in filter:
+            if not all([flt.get(s) for s in ['col', 'op', 'val']]):
+                continue
+            col = flt['col']
+            col_obj = cols.get(col)
+            op = flt['op']
+            eq = flt['val']
+            if query:
+                query += ' and '
+            if op == 'LIKE':
+                query += "{col}.str.match('{eq}')".format(col=col, eq=eq)
+            else:
+                # Rely on Pandas partial string indexing for datetime fields,
+                # see https://pandas.pydata.org/pandas-docs/stable/timeseries.html#partial-string-indexing  # NOQA
+                try:
+                    if ((col_obj.is_string or col_obj.is_dttm)
+                            and not isinstance(eq, list)):
+                        eq = "'{}'".format(eq)
+                except AttributeError:
+                    # col_obj is None, probably because the col is a metric,
+                    # in which case it is numeric anyway
+                    pass
+                query += "({col} {op} {eq})".format(col=col, op=op, eq=eq)
+        return query
+
+    def process_dataframe(
+            self,
+            df,
+            groupby, metrics,
+            granularity,
+            from_dttm, to_dttm,
+            filter=None,  # noqa
+            is_timeseries=True,
+            timeseries_limit=15,
+            timeseries_limit_metric=None,
+            row_limit=None,
+            inner_from_dttm=None,
+            inner_to_dttm=None,
+            orderby=None,
+            extras=None,
+            columns=None,
+            form_data=None):
+        """Querying any dataframe table from this common interface"""
+        if orderby:
+            orderby, ascending = map(list, zip(*orderby))
+        else:
+            orderby = []
+            ascending = []
+        filter = filter or []
+        query_str = 'df'
+
+        # Build a dict of the metrics to include, including those that
+        # are required for post-aggregation filtering
+        filtered_metrics = [flt['col']
+                            for flt in extras.get('having_druid', [])
 
 Review comment:
   I'm adding a comment for the use of `having_druid`, and similar ones for `granularity_sqla` and `time_grain_sqla`. However, if there is a long term plan to encourage the development of more connectors it would be better to rename these parameters in the main code. For example to `granularity_col`, `granularity_freq` and `post_aggregation_filter`.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services