You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by ch...@apache.org on 2019/03/05 02:07:15 UTC
[incubator-superset] branch master updated: Enhancement of query
context and object. (#6962)
This is an automated email from the ASF dual-hosted git repository.
christine pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git
The following commit(s) were added to refs/heads/master by this push:
new d5b9795 Enhancement of query context and object. (#6962)
d5b9795 is described below
commit d5b9795f87f79fa2c41e144ffc00fd9586be7657
Author: Conglei <sh...@gmail.com>
AuthorDate: Mon Mar 4 18:06:59 2019 -0800
Enhancement of query context and object. (#6962)
* added more functionalities for query context and object.
* fixed cache logic
* added default value for groupby
* updated comments and removed print
---
superset/common/query_context.py | 223 ++++++++++++++++++++++++++++++++++++++-
superset/common/query_object.py | 86 ++++++++++++---
superset/views/api.py | 44 ++++++--
3 files changed, 328 insertions(+), 25 deletions(-)
diff --git a/superset/common/query_context.py b/superset/common/query_context.py
index 0964292..5053372 100644
--- a/superset/common/query_context.py
+++ b/superset/common/query_context.py
@@ -14,30 +14,247 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-# pylint: disable=R
+# pylint: disable=C,R,W
+from datetime import datetime, timedelta
+import logging
+import pickle as pkl
+import traceback
from typing import Dict, List
+import numpy as np
+import pandas as pd
+
+from superset import app, cache
from superset import db
from superset.connectors.connector_registry import ConnectorRegistry
+from superset.utils import core as utils
+from superset.utils.core import DTTM_ALIAS
from .query_object import QueryObject
+config = app.config
+stats_logger = config.get('STATS_LOGGER')
+
class QueryContext:
"""
The query context contains the query object and additional fields necessary
to retrieve the data payload for a given viz.
"""
+
+ default_fillna = 0
+ cache_type = 'df'
+ enforce_numerical_metrics = True
+
# TODO: Type datasource and query_object dictionary with TypedDict when it becomes
# a vanilla python type https://github.com/python/mypy/issues/5288
def __init__(
self,
datasource: Dict,
queries: List[Dict],
+ force: bool = False,
+ custom_cache_timeout: int = None,
):
self.datasource = ConnectorRegistry.get_datasource(datasource.get('type'),
int(datasource.get('id')),
db.session)
self.queries = list(map(lambda query_obj: QueryObject(**query_obj), queries))
- def get_data(self):
- raise NotImplementedError()
+ self.force = force
+
+ self.custom_cache_timeout = custom_cache_timeout
+
+ self.enforce_numerical_metrics = True
+
+ def get_query_result(self, query_object):
+ """Returns a pandas dataframe based on the query object"""
+
+ # Here, we assume that all the queries will use the same datasource, which is
+ # is a valid assumption for current setting. In a long term, we may or maynot
+ # support multiple queries from different data source.
+
+ timestamp_format = None
+ if self.datasource.type == 'table':
+ dttm_col = self.datasource.get_col(query_object.granularity)
+ if dttm_col:
+ timestamp_format = dttm_col.python_date_format
+
+ # The datasource here can be different backend but the interface is common
+ result = self.datasource.query(query_object.to_dict())
+
+ df = result.df
+ # Transform the timestamp we received from database to pandas supported
+ # datetime format. If no python_date_format is specified, the pattern will
+ # be considered as the default ISO date format
+ # If the datetime format is unix, the parse will use the corresponding
+ # parsing logic
+ if df is not None and not df.empty:
+ if DTTM_ALIAS in df.columns:
+ if timestamp_format in ('epoch_s', 'epoch_ms'):
+ # Column has already been formatted as a timestamp.
+ df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp)
+ else:
+ df[DTTM_ALIAS] = pd.to_datetime(
+ df[DTTM_ALIAS], utc=False, format=timestamp_format)
+ if self.datasource.offset:
+ df[DTTM_ALIAS] += timedelta(hours=self.datasource.offset)
+ df[DTTM_ALIAS] += query_object.time_shift
+
+ if self.enforce_numerical_metrics:
+ self.df_metrics_to_num(df, query_object)
+
+ df.replace([np.inf, -np.inf], np.nan)
+ df = self.handle_nulls(df)
+ return {
+ 'query': result.query,
+ 'status': result.status,
+ 'error_message': result.error_message,
+ 'df': df,
+ }
+
+ def df_metrics_to_num(self, df, query_object):
+ """Converting metrics to numeric when pandas.read_sql cannot"""
+ metrics = [metric for metric in query_object.metrics]
+ for col, dtype in df.dtypes.items():
+ if dtype.type == np.object_ and col in metrics:
+ df[col] = pd.to_numeric(df[col], errors='coerce')
+
+ def handle_nulls(self, df):
+ fillna = self.get_fillna_for_columns(df.columns)
+ return df.fillna(fillna)
+
+ def get_fillna_for_col(self, col):
+ """Returns the value to use as filler for a specific Column.type"""
+ if col and col.is_string:
+ return ' NULL'
+ return self.default_fillna
+
+ def get_fillna_for_columns(self, columns=None):
+ """Returns a dict or scalar that can be passed to DataFrame.fillna"""
+ if columns is None:
+ return self.default_fillna
+ columns_dict = {col.column_name: col for col in self.datasource.columns}
+ fillna = {
+ c: self.get_fillna_for_col(columns_dict.get(c))
+ for c in columns
+ }
+ return fillna
+
+ def get_data(self, df):
+ return df.to_dict(orient='records')
+
+ def get_single_payload(self, query_obj):
+ """Returns a payload of metadata and data"""
+ payload = self.get_df_payload(query_obj)
+ df = payload.get('df')
+ status = payload.get('status')
+ if status != utils.QueryStatus.FAILED:
+ if df is not None and df.empty:
+ payload['error'] = 'No data'
+ else:
+ payload['data'] = self.get_data(df)
+ if 'df' in payload:
+ del payload['df']
+ return payload
+
+ def get_payload(self):
+ """Get all the paylaods from the arrays"""
+ return [self.get_single_payload(query_ojbect) for query_ojbect in self.queries]
+
+ @property
+ def cache_timeout(self):
+ if self.custom_cache_timeout is not None:
+ return self.custom_cache_timeout
+ if self.datasource.cache_timeout is not None:
+ return self.datasource.cache_timeout
+ if (
+ hasattr(self.datasource, 'database') and
+ self.datasource.database.cache_timeout) is not None:
+ return self.datasource.database.cache_timeout
+ return config.get('CACHE_DEFAULT_TIMEOUT')
+
+ def get_df_payload(self, query_obj, **kwargs):
+ """Handles caching around the df paylod retrieval"""
+ cache_key = query_obj.cache_key(
+ datasource=self.datasource.uid, **kwargs) if query_obj else None
+ logging.info('Cache key: {}'.format(cache_key))
+ is_loaded = False
+ stacktrace = None
+ df = None
+ cached_dttm = datetime.utcnow().isoformat().split('.')[0]
+ cache_value = None
+ status = None
+ query = ''
+ error_message = None
+ if cache_key and cache and not self.force:
+ cache_value = cache.get(cache_key)
+ if cache_value:
+ stats_logger.incr('loaded_from_cache')
+ try:
+ cache_value = pkl.loads(cache_value)
+ df = cache_value['df']
+ query = cache_value['query']
+ status = utils.QueryStatus.SUCCESS
+ is_loaded = True
+ except Exception as e:
+ logging.exception(e)
+ logging.error('Error reading cache: ' +
+ utils.error_msg_from_exception(e))
+ logging.info('Serving from cache')
+
+ if query_obj and not is_loaded:
+ try:
+ query_result = self.get_query_result(query_obj)
+ status = query_result['status']
+ query = query_result['query']
+ error_message = query_result['error_message']
+ df = query_result['df']
+ if status != utils.QueryStatus.FAILED:
+ stats_logger.incr('loaded_from_source')
+ is_loaded = True
+ except Exception as e:
+ logging.exception(e)
+ if not error_message:
+ error_message = '{}'.format(e)
+ status = utils.QueryStatus.FAILED
+ stacktrace = traceback.format_exc()
+
+ if (
+ is_loaded and
+ cache_key and
+ cache and
+ status != utils.QueryStatus.FAILED):
+ try:
+ cache_value = dict(
+ dttm=cached_dttm,
+ df=df if df is not None else None,
+ query=query,
+ )
+ cache_value = pkl.dumps(
+ cache_value, protocol=pkl.HIGHEST_PROTOCOL)
+
+ logging.info('Caching {} chars at key {}'.format(
+ len(cache_value), cache_key))
+
+ stats_logger.incr('set_cache_key')
+ cache.set(
+ cache_key,
+ cache_value,
+ timeout=self.cache_timeout)
+ except Exception as e:
+ # cache.set call can fail if the backend is down or if
+ # the key is too large or whatever other reasons
+ logging.warning('Could not cache key {}'.format(cache_key))
+ logging.exception(e)
+ cache.delete(cache_key)
+ return {
+ 'cache_key': cache_key,
+ 'cached_dttm': cache_value['dttm'] if cache_value is not None else None,
+ 'cache_timeout': self.cache_timeout,
+ 'df': df,
+ 'error': error_message,
+ 'is_cached': cache_key is not None,
+ 'query': query,
+ 'status': status,
+ 'stacktrace': stacktrace,
+ 'rowcount': len(df.index) if df is not None else 0,
+ }
diff --git a/superset/common/query_object.py b/superset/common/query_object.py
index c851d47..a239404 100644
--- a/superset/common/query_object.py
+++ b/superset/common/query_object.py
@@ -15,15 +15,17 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=R
-from typing import Dict, List, Optional
+import hashlib
+from typing import Dict, List, Optional, Union
+
+import simplejson as json
from superset import app
from superset.utils import core as utils
+
# TODO: Type Metrics dictionary with TypedDict when it becomes a vanilla python type
# https://github.com/python/mypy/issues/5288
-Metric = Dict
-
class QueryObject:
"""
@@ -33,31 +35,87 @@ class QueryObject:
def __init__(
self,
granularity: str,
+ metrics: List[Union[Dict, str]],
groupby: List[str] = None,
- metrics: List[Metric] = None,
filters: List[str] = None,
time_range: Optional[str] = None,
time_shift: Optional[str] = None,
is_timeseries: bool = False,
+ timeseries_limit: int = 0,
row_limit: int = app.config.get('ROW_LIMIT'),
- limit: int = 0,
- timeseries_limit_metric: Optional[Metric] = None,
+ timeseries_limit_metric: Optional[Dict] = None,
order_desc: bool = True,
extras: Optional[Dict] = None,
+ prequeries: Optional[Dict] = None,
+ is_prequery: bool = False,
+ columns: List[str] = None,
+ orderby: List[List] = None,
):
self.granularity = granularity
self.from_dttm, self.to_dttm = utils.get_since_until(time_range, time_shift)
self.is_timeseries = is_timeseries
- self.groupby = groupby or []
- self.metrics = metrics or []
- self.filter = filters or []
+ self.time_range = time_range
+ self.time_shift = utils.parse_human_timedelta(time_shift)
+ self.groupby = groupby if groupby is not None else []
+
+ # Temporal solution for backward compatability issue
+ # due the new format of non-ad-hoc metric.
+ self.metrics = [metric if 'expressionType' in metric else metric['label']
+ for metric in metrics]
self.row_limit = row_limit
- self.timeseries_limit = int(limit)
+ self.filter = filters if filters is not None else []
+ self.timeseries_limit = timeseries_limit
self.timeseries_limit_metric = timeseries_limit_metric
self.order_desc = order_desc
- self.prequeries = []
- self.is_prequery = False
- self.extras = extras
+ self.prequeries = prequeries
+ self.is_prequery = is_prequery
+ self.extras = extras if extras is not None else {}
+ self.columns = columns if columns is not None else []
+ self.orderby = orderby if orderby is not None else []
def to_dict(self):
- raise NotImplementedError()
+ query_object_dict = {
+ 'granularity': self.granularity,
+ 'from_dttm': self.from_dttm,
+ 'to_dttm': self.to_dttm,
+ 'is_timeseries': self.is_timeseries,
+ 'groupby': self.groupby,
+ 'metrics': self.metrics,
+ 'row_limit': self.row_limit,
+ 'filter': self.filter,
+ 'timeseries_limit': self.timeseries_limit,
+ 'timeseries_limit_metric': self.timeseries_limit_metric,
+ 'order_desc': self.order_desc,
+ 'prequeries': self.prequeries,
+ 'is_prequery': self.is_prequery,
+ 'extras': self.extras,
+ 'columns': self.columns,
+ 'orderby': self.orderby,
+ }
+ return query_object_dict
+
+ def cache_key(self, **extra):
+ """
+ The cache key is made out of the key/values in `query_obj`, plus any
+ other key/values in `extra`
+ We remove datetime bounds that are hard values, and replace them with
+ the use-provided inputs to bounds, which may be time-relative (as in
+ "5 days ago" or "now").
+ """
+ cache_dict = self.to_dict()
+ cache_dict.update(extra)
+
+ for k in ['from_dttm', 'to_dttm']:
+ del cache_dict[k]
+ if self.time_range:
+ cache_dict['time_range'] = self.time_range
+ json_data = self.json_dumps(cache_dict, sort_keys=True)
+ return hashlib.md5(json_data.encode('utf-8')).hexdigest()
+
+ def json_dumps(self, obj, sort_keys=False):
+ return json.dumps(
+ obj,
+ default=utils.json_int_dttm_ser,
+ ignore_nan=True,
+ sort_keys=sort_keys,
+ )
diff --git a/superset/views/api.py b/superset/views/api.py
index 7b84217..aadee9c 100644
--- a/superset/views/api.py
+++ b/superset/views/api.py
@@ -15,16 +15,18 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=R
-import json
-
-from flask import g, request
+from flask import request
from flask_appbuilder import expose
from flask_appbuilder.security.decorators import has_access_api
+import simplejson as json
-from superset import appbuilder, security_manager
+from superset import appbuilder, db, security_manager
from superset.common.query_context import QueryContext
+from superset.legacy import update_time_range
+import superset.models.core as models
from superset.models.core import Log
-from .base import api, BaseSupersetView, data_payload_response, handle_api_exception
+from superset.utils import core as utils
+from .base import api, BaseSupersetView, handle_api_exception
class Api(BaseSupersetView):
@@ -37,11 +39,37 @@ class Api(BaseSupersetView):
"""
Takes a query_obj constructed in the client and returns payload data response
for the given query_obj.
+ params: query_context: json_blob
"""
query_context = QueryContext(**json.loads(request.form.get('query_context')))
- security_manager.assert_datasource_permission(query_context.datasource, g.user)
- payload_json = query_context.get_data()
- return data_payload_response(payload_json)
+ security_manager.assert_datasource_permission(query_context.datasource)
+ payload_json = query_context.get_payload()
+ return json.dumps(
+ payload_json,
+ default=utils.json_int_dttm_ser,
+ ignore_nan=True,
+ )
+
+ @Log.log_this
+ @api
+ @handle_api_exception
+ @has_access_api
+ @expose('/v1/form_data/', methods=['GET'])
+ def query_form_data(self):
+ """
+ Get the formdata stored in the database for existing slice.
+ params: slice_id: integer
+ """
+ form_data = {}
+ slice_id = request.args.get('slice_id')
+ if slice_id:
+ slc = db.session.query(models.Slice).filter_by(id=slice_id).one_or_none()
+ if slc:
+ form_data = slc.form_data.copy()
+
+ update_time_range(form_data)
+
+ return json.dumps(form_data)
appbuilder.add_view_no_menu(Api)