You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by ch...@apache.org on 2019/03/05 02:07:15 UTC

[incubator-superset] branch master updated: Enhancement of query context and object. (#6962)

This is an automated email from the ASF dual-hosted git repository.

christine pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new d5b9795  Enhancement of query context and object. (#6962)
d5b9795 is described below

commit d5b9795f87f79fa2c41e144ffc00fd9586be7657
Author: Conglei <sh...@gmail.com>
AuthorDate: Mon Mar 4 18:06:59 2019 -0800

    Enhancement of query context and object. (#6962)
    
    * added more functionalities for query context and object.
    
    * fixed cache logic
    
    * added default value for groupby
    
    * updated comments and removed print
---
 superset/common/query_context.py | 223 ++++++++++++++++++++++++++++++++++++++-
 superset/common/query_object.py  |  86 ++++++++++++---
 superset/views/api.py            |  44 ++++++--
 3 files changed, 328 insertions(+), 25 deletions(-)

diff --git a/superset/common/query_context.py b/superset/common/query_context.py
index 0964292..5053372 100644
--- a/superset/common/query_context.py
+++ b/superset/common/query_context.py
@@ -14,30 +14,247 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# pylint: disable=R
+# pylint: disable=C,R,W
+from datetime import datetime, timedelta
+import logging
+import pickle as pkl
+import traceback
 from typing import Dict, List
 
+import numpy as np
+import pandas as pd
+
+from superset import app, cache
 from superset import db
 from superset.connectors.connector_registry import ConnectorRegistry
+from superset.utils import core as utils
+from superset.utils.core import DTTM_ALIAS
 from .query_object import QueryObject
 
+config = app.config
+stats_logger = config.get('STATS_LOGGER')
+
 
 class QueryContext:
     """
     The query context contains the query object and additional fields necessary
     to retrieve the data payload for a given viz.
     """
+
+    default_fillna = 0
+    cache_type = 'df'
+    enforce_numerical_metrics = True
+
     # TODO: Type datasource and query_object dictionary with TypedDict when it becomes
     # a vanilla python type https://github.com/python/mypy/issues/5288
     def __init__(
             self,
             datasource: Dict,
             queries: List[Dict],
+            force: bool = False,
+            custom_cache_timeout: int = None,
     ):
         self.datasource = ConnectorRegistry.get_datasource(datasource.get('type'),
                                                            int(datasource.get('id')),
                                                            db.session)
         self.queries = list(map(lambda query_obj: QueryObject(**query_obj), queries))
 
-    def get_data(self):
-        raise NotImplementedError()
+        self.force = force
+
+        self.custom_cache_timeout = custom_cache_timeout
+
+        self.enforce_numerical_metrics = True
+
+    def get_query_result(self, query_object):
+        """Returns a pandas dataframe based on the query object"""
+
+        # Here, we assume that all the queries will use the same datasource, which is
+        # is a valid assumption for current setting. In a long term, we may or maynot
+        # support multiple queries from different data source.
+
+        timestamp_format = None
+        if self.datasource.type == 'table':
+            dttm_col = self.datasource.get_col(query_object.granularity)
+            if dttm_col:
+                timestamp_format = dttm_col.python_date_format
+
+        # The datasource here can be different backend but the interface is common
+        result = self.datasource.query(query_object.to_dict())
+
+        df = result.df
+        # Transform the timestamp we received from database to pandas supported
+        # datetime format. If no python_date_format is specified, the pattern will
+        # be considered as the default ISO date format
+        # If the datetime format is unix, the parse will use the corresponding
+        # parsing logic
+        if df is not None and not df.empty:
+            if DTTM_ALIAS in df.columns:
+                if timestamp_format in ('epoch_s', 'epoch_ms'):
+                    # Column has already been formatted as a timestamp.
+                    df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp)
+                else:
+                    df[DTTM_ALIAS] = pd.to_datetime(
+                        df[DTTM_ALIAS], utc=False, format=timestamp_format)
+                if self.datasource.offset:
+                    df[DTTM_ALIAS] += timedelta(hours=self.datasource.offset)
+                df[DTTM_ALIAS] += query_object.time_shift
+
+            if self.enforce_numerical_metrics:
+                self.df_metrics_to_num(df, query_object)
+
+            df.replace([np.inf, -np.inf], np.nan)
+            df = self.handle_nulls(df)
+        return {
+            'query': result.query,
+            'status': result.status,
+            'error_message': result.error_message,
+            'df': df,
+        }
+
+    def df_metrics_to_num(self, df, query_object):
+        """Converting metrics to numeric when pandas.read_sql cannot"""
+        metrics = [metric for metric in query_object.metrics]
+        for col, dtype in df.dtypes.items():
+            if dtype.type == np.object_ and col in metrics:
+                df[col] = pd.to_numeric(df[col], errors='coerce')
+
+    def handle_nulls(self, df):
+        fillna = self.get_fillna_for_columns(df.columns)
+        return df.fillna(fillna)
+
+    def get_fillna_for_col(self, col):
+        """Returns the value to use as filler for a specific Column.type"""
+        if col and col.is_string:
+            return ' NULL'
+        return self.default_fillna
+
+    def get_fillna_for_columns(self, columns=None):
+        """Returns a dict or scalar that can be passed to DataFrame.fillna"""
+        if columns is None:
+            return self.default_fillna
+        columns_dict = {col.column_name: col for col in self.datasource.columns}
+        fillna = {
+            c: self.get_fillna_for_col(columns_dict.get(c))
+            for c in columns
+        }
+        return fillna
+
+    def get_data(self, df):
+        return df.to_dict(orient='records')
+
+    def get_single_payload(self, query_obj):
+        """Returns a payload of metadata and data"""
+        payload = self.get_df_payload(query_obj)
+        df = payload.get('df')
+        status = payload.get('status')
+        if status != utils.QueryStatus.FAILED:
+            if df is not None and df.empty:
+                payload['error'] = 'No data'
+            else:
+                payload['data'] = self.get_data(df)
+        if 'df' in payload:
+            del payload['df']
+        return payload
+
+    def get_payload(self):
+        """Get all the paylaods from the arrays"""
+        return [self.get_single_payload(query_ojbect) for query_ojbect in self.queries]
+
+    @property
+    def cache_timeout(self):
+        if self.custom_cache_timeout is not None:
+            return self.custom_cache_timeout
+        if self.datasource.cache_timeout is not None:
+            return self.datasource.cache_timeout
+        if (
+                hasattr(self.datasource, 'database') and
+                self.datasource.database.cache_timeout) is not None:
+            return self.datasource.database.cache_timeout
+        return config.get('CACHE_DEFAULT_TIMEOUT')
+
+    def get_df_payload(self, query_obj, **kwargs):
+        """Handles caching around the df paylod retrieval"""
+        cache_key = query_obj.cache_key(
+            datasource=self.datasource.uid, **kwargs) if query_obj else None
+        logging.info('Cache key: {}'.format(cache_key))
+        is_loaded = False
+        stacktrace = None
+        df = None
+        cached_dttm = datetime.utcnow().isoformat().split('.')[0]
+        cache_value = None
+        status = None
+        query = ''
+        error_message = None
+        if cache_key and cache and not self.force:
+            cache_value = cache.get(cache_key)
+            if cache_value:
+                stats_logger.incr('loaded_from_cache')
+                try:
+                    cache_value = pkl.loads(cache_value)
+                    df = cache_value['df']
+                    query = cache_value['query']
+                    status = utils.QueryStatus.SUCCESS
+                    is_loaded = True
+                except Exception as e:
+                    logging.exception(e)
+                    logging.error('Error reading cache: ' +
+                                  utils.error_msg_from_exception(e))
+                logging.info('Serving from cache')
+
+        if query_obj and not is_loaded:
+            try:
+                query_result = self.get_query_result(query_obj)
+                status = query_result['status']
+                query = query_result['query']
+                error_message = query_result['error_message']
+                df = query_result['df']
+                if status != utils.QueryStatus.FAILED:
+                    stats_logger.incr('loaded_from_source')
+                    is_loaded = True
+            except Exception as e:
+                logging.exception(e)
+                if not error_message:
+                    error_message = '{}'.format(e)
+                status = utils.QueryStatus.FAILED
+                stacktrace = traceback.format_exc()
+
+            if (
+                    is_loaded and
+                    cache_key and
+                    cache and
+                    status != utils.QueryStatus.FAILED):
+                try:
+                    cache_value = dict(
+                        dttm=cached_dttm,
+                        df=df if df is not None else None,
+                        query=query,
+                    )
+                    cache_value = pkl.dumps(
+                        cache_value, protocol=pkl.HIGHEST_PROTOCOL)
+
+                    logging.info('Caching {} chars at key {}'.format(
+                        len(cache_value), cache_key))
+
+                    stats_logger.incr('set_cache_key')
+                    cache.set(
+                        cache_key,
+                        cache_value,
+                        timeout=self.cache_timeout)
+                except Exception as e:
+                    # cache.set call can fail if the backend is down or if
+                    # the key is too large or whatever other reasons
+                    logging.warning('Could not cache key {}'.format(cache_key))
+                    logging.exception(e)
+                    cache.delete(cache_key)
+        return {
+            'cache_key': cache_key,
+            'cached_dttm': cache_value['dttm'] if cache_value is not None else None,
+            'cache_timeout': self.cache_timeout,
+            'df': df,
+            'error': error_message,
+            'is_cached': cache_key is not None,
+            'query': query,
+            'status': status,
+            'stacktrace': stacktrace,
+            'rowcount': len(df.index) if df is not None else 0,
+        }
diff --git a/superset/common/query_object.py b/superset/common/query_object.py
index c851d47..a239404 100644
--- a/superset/common/query_object.py
+++ b/superset/common/query_object.py
@@ -15,15 +15,17 @@
 # specific language governing permissions and limitations
 # under the License.
 # pylint: disable=R
-from typing import Dict, List, Optional
+import hashlib
+from typing import Dict, List, Optional, Union
+
+import simplejson as json
 
 from superset import app
 from superset.utils import core as utils
 
+
 # TODO: Type Metrics dictionary with TypedDict when it becomes a vanilla python type
 # https://github.com/python/mypy/issues/5288
-Metric = Dict
-
 
 class QueryObject:
     """
@@ -33,31 +35,87 @@ class QueryObject:
     def __init__(
             self,
             granularity: str,
+            metrics: List[Union[Dict, str]],
             groupby: List[str] = None,
-            metrics: List[Metric] = None,
             filters: List[str] = None,
             time_range: Optional[str] = None,
             time_shift: Optional[str] = None,
             is_timeseries: bool = False,
+            timeseries_limit: int = 0,
             row_limit: int = app.config.get('ROW_LIMIT'),
-            limit: int = 0,
-            timeseries_limit_metric: Optional[Metric] = None,
+            timeseries_limit_metric: Optional[Dict] = None,
             order_desc: bool = True,
             extras: Optional[Dict] = None,
+            prequeries: Optional[Dict] = None,
+            is_prequery: bool = False,
+            columns: List[str] = None,
+            orderby: List[List] = None,
     ):
         self.granularity = granularity
         self.from_dttm, self.to_dttm = utils.get_since_until(time_range, time_shift)
         self.is_timeseries = is_timeseries
-        self.groupby = groupby or []
-        self.metrics = metrics or []
-        self.filter = filters or []
+        self.time_range = time_range
+        self.time_shift = utils.parse_human_timedelta(time_shift)
+        self.groupby = groupby if groupby is not None else []
+
+        # Temporal solution for backward compatability issue
+        # due the new format of non-ad-hoc metric.
+        self.metrics = [metric if 'expressionType' in metric else metric['label']
+                        for metric in metrics]
         self.row_limit = row_limit
-        self.timeseries_limit = int(limit)
+        self.filter = filters if filters is not None else []
+        self.timeseries_limit = timeseries_limit
         self.timeseries_limit_metric = timeseries_limit_metric
         self.order_desc = order_desc
-        self.prequeries = []
-        self.is_prequery = False
-        self.extras = extras
+        self.prequeries = prequeries
+        self.is_prequery = is_prequery
+        self.extras = extras if extras is not None else {}
+        self.columns = columns if columns is not None else []
+        self.orderby = orderby if orderby is not None else []
 
     def to_dict(self):
-        raise NotImplementedError()
+        query_object_dict = {
+            'granularity': self.granularity,
+            'from_dttm': self.from_dttm,
+            'to_dttm': self.to_dttm,
+            'is_timeseries': self.is_timeseries,
+            'groupby': self.groupby,
+            'metrics': self.metrics,
+            'row_limit': self.row_limit,
+            'filter': self.filter,
+            'timeseries_limit': self.timeseries_limit,
+            'timeseries_limit_metric': self.timeseries_limit_metric,
+            'order_desc': self.order_desc,
+            'prequeries': self.prequeries,
+            'is_prequery': self.is_prequery,
+            'extras': self.extras,
+            'columns': self.columns,
+            'orderby': self.orderby,
+        }
+        return query_object_dict
+
+    def cache_key(self, **extra):
+        """
+        The cache key is made out of the key/values in `query_obj`, plus any
+        other key/values in `extra`
+        We remove datetime bounds that are hard values, and replace them with
+        the use-provided inputs to bounds, which may be time-relative (as in
+        "5 days ago" or "now").
+        """
+        cache_dict = self.to_dict()
+        cache_dict.update(extra)
+
+        for k in ['from_dttm', 'to_dttm']:
+            del cache_dict[k]
+        if self.time_range:
+            cache_dict['time_range'] = self.time_range
+        json_data = self.json_dumps(cache_dict, sort_keys=True)
+        return hashlib.md5(json_data.encode('utf-8')).hexdigest()
+
+    def json_dumps(self, obj, sort_keys=False):
+        return json.dumps(
+            obj,
+            default=utils.json_int_dttm_ser,
+            ignore_nan=True,
+            sort_keys=sort_keys,
+        )
diff --git a/superset/views/api.py b/superset/views/api.py
index 7b84217..aadee9c 100644
--- a/superset/views/api.py
+++ b/superset/views/api.py
@@ -15,16 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 # pylint: disable=R
-import json
-
-from flask import g, request
+from flask import request
 from flask_appbuilder import expose
 from flask_appbuilder.security.decorators import has_access_api
+import simplejson as json
 
-from superset import appbuilder, security_manager
+from superset import appbuilder, db, security_manager
 from superset.common.query_context import QueryContext
+from superset.legacy import update_time_range
+import superset.models.core as models
 from superset.models.core import Log
-from .base import api, BaseSupersetView, data_payload_response, handle_api_exception
+from superset.utils import core as utils
+from .base import api, BaseSupersetView, handle_api_exception
 
 
 class Api(BaseSupersetView):
@@ -37,11 +39,37 @@ class Api(BaseSupersetView):
         """
         Takes a query_obj constructed in the client and returns payload data response
         for the given query_obj.
+        params: query_context: json_blob
         """
         query_context = QueryContext(**json.loads(request.form.get('query_context')))
-        security_manager.assert_datasource_permission(query_context.datasource, g.user)
-        payload_json = query_context.get_data()
-        return data_payload_response(payload_json)
+        security_manager.assert_datasource_permission(query_context.datasource)
+        payload_json = query_context.get_payload()
+        return json.dumps(
+            payload_json,
+            default=utils.json_int_dttm_ser,
+            ignore_nan=True,
+        )
+
+    @Log.log_this
+    @api
+    @handle_api_exception
+    @has_access_api
+    @expose('/v1/form_data/', methods=['GET'])
+    def query_form_data(self):
+        """
+        Get the formdata stored in the database for existing slice.
+        params: slice_id: integer
+        """
+        form_data = {}
+        slice_id = request.args.get('slice_id')
+        if slice_id:
+            slc = db.session.query(models.Slice).filter_by(id=slice_id).one_or_none()
+            if slc:
+                form_data = slc.form_data.copy()
+
+        update_time_range(form_data)
+
+        return json.dumps(form_data)
 
 
 appbuilder.add_view_no_menu(Api)