You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@superset.apache.org by GitBox <gi...@apache.org> on 2020/12/08 19:47:12 UTC

[GitHub] [incubator-superset] ktmud commented on a change in pull request #11499: feat(SIP-39): Async query support for charts

ktmud commented on a change in pull request #11499:
URL: https://github.com/apache/incubator-superset/pull/11499#discussion_r537983447



##########
File path: superset-frontend/spec/javascripts/middleware/asyncEvent_spec.js
##########
@@ -0,0 +1,255 @@
+/**

Review comment:
       Can this file be typescript, too?

##########
File path: superset-frontend/src/dashboard/index.jsx
##########
@@ -33,10 +36,22 @@ const bootstrapData = JSON.parse(appContainer.getAttribute('data-bootstrap'));
 initFeatureFlags(bootstrapData.common.feature_flags);
 const initState = getInitialState(bootstrapData);
 
+const asyncEventMiddleware = initAsyncEvents({
+  getPendingComponents: state =>
+    filter(state.charts, { chartStatus: 'loading' }),

Review comment:
       Suggestion:
   
   ```ts
     getPendingComponents: ({ charts }) => charts.filter(x => x.chartStatus === 'loading')
   ```
   
   One less dependency and more straightforward code

##########
File path: superset/app.py
##########
@@ -647,6 +651,20 @@ def configure_wtf(self) -> None:
             for ex in csrf_exempt_list:
                 csrf.exempt(ex)
 
+    def configure_async_queries(self) -> None:
+        if feature_flag_manager.is_feature_enabled("GLOBAL_ASYNC_QUERIES"):
+            if (
+                self.config["CACHE_CONFIG"]["CACHE_TYPE"] == "null"
+                or self.config["DATA_CACHE_CONFIG"]["CACHE_TYPE"] == "null"

Review comment:
       Should we check `GLOBAL_ASYNC_QUERIES_REDIS_CONFIG` here as well? Or maybe move all config validation to `async_query_manager.init_app` and use `app.config`.

##########
File path: superset/async_events/api.py
##########
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from flask import request, Response
+from flask_appbuilder import expose
+from flask_appbuilder.api import BaseApi, safe
+from flask_appbuilder.security.decorators import permission_name, protect
+
+from superset.extensions import async_query_manager, event_logger
+from superset.utils.async_query_manager import AsyncQueryTokenException
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncEventsRestApi(BaseApi):
+    resource_name = "async_event"
+    allow_browser_login = True
+    include_route_methods = {
+        "events",
+    }
+
+    @expose("/", methods=["GET"])
+    @event_logger.log_this
+    @protect()
+    @safe
+    @permission_name("list")
+    def events(self) -> Response:
+        """
+        Reads off of the Redis async events stream, using the user's JWT token and
+        optional query params for last event received.
+        ---
+        get:
+          description: >-
+            Reads off of the Redis events stream, using the user's JWT token and
+            optional query params for last event received.
+          parameters:
+          - in: query
+            name: last_id
+            description: Last ID received by the client
+            schema:
+                type: string
+          responses:
+            200:
+              description: Async event results
+              content:
+                application/json:
+                  schema:
+                    type: object
+                    properties:
+                        result:
+                            type: array
+                            items:
+                              type: object
+                              properties:
+                                id:
+                                  type: string
+                                channel_id:
+                                  type: string
+                                job_id:
+                                  type: string
+                                user_id:
+                                  type: string

Review comment:
       Are these IDs strings or integers?

##########
File path: superset/charts/api.py
##########
@@ -448,6 +457,43 @@ def bulk_delete(self, **kwargs: Any) -> Response:
         except ChartBulkDeleteFailedError as ex:
             return self.response_422(message=str(ex))
 
+    def get_data_response(
+        self, command: ChartDataCommand, force_cached: bool = False
+    ) -> Response:
+        try:
+            result = command.run(force_cached=force_cached)
+        except ChartDataCacheLoadError as exc:
+            return self.response_422(message=exc.message)
+        except ChartDataQueryFailedError as exc:
+            return self.response_400(message=exc.message)
+
+        result_format = result["query_context"].result_format
+        response = self.response_400(
+            message=f"Unsupported result_format: {result_format}"
+        )
+
+        if result_format == ChartDataResultFormat.CSV:
+            # return the first result
+            data = result["queries"][0]["data"]
+            response = CsvResponse(
+                data,
+                status=200,
+                headers=generate_download_headers("csv"),
+                mimetype="application/csv",
+            )

Review comment:
       Nit: could use an early return here and move `response_400` to the end.

##########
File path: superset/common/query_context.py
##########
@@ -186,9 +199,28 @@ def get_single_payload(self, query_obj: QueryObject) -> Dict[str, Any]:
             return {"data": payload["data"]}
         return payload
 
-    def get_payload(self) -> List[Dict[str, Any]]:
-        """Get all the payloads from the QueryObjects"""
-        return [self.get_single_payload(query_object) for query_object in self.queries]
+    def get_payload(self, **kwargs: Any) -> Dict[str, Any]:
+        cache_query_context = kwargs.get("cache_query_context", False)
+        force_cached = kwargs.get("force_cached", False)
+
+        # Get all the payloads from the QueryObjects
+        query_results = [
+            self.get_single_payload(query_object, force_cached=force_cached)
+            for query_object in self.queries
+        ]
+        return_value = {"queries": query_results}
+
+        if cache_query_context:
+            cache_key = self.cache_key()
+            set_and_log_cache(
+                cache_manager.cache,

Review comment:
       Should probably use `cache_manager.data_cache` here. What we cached is the actual data queried from datasources, right? 

##########
File path: superset/app.py
##########
@@ -497,6 +500,7 @@ def init_app_in_ctx(self) -> None:
         self.configure_url_map_converters()
         self.configure_data_sources()
         self.configure_auth_provider()
+        self.configure_async_queries()

Review comment:
       Not sure if it matters, but SQL Lab queries via Celery is also called async queries: https://superset.apache.org/docs/installation/async-queries-celery
   
   This could feel someone confusing for someone new to Superset. Should we update the document to add some clarification?

##########
File path: superset/tasks/async_queries.py
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, cast, Dict, Optional
+
+from flask import current_app
+
+from superset import app
+from superset.exceptions import SupersetVizException
+from superset.extensions import async_query_manager, cache_manager, celery_app
+from superset.utils.cache import generate_cache_key, set_and_log_cache
+from superset.views.utils import get_datasource_info, get_viz
+
+logger = logging.getLogger(__name__)
+query_timeout = current_app.config[
+    "SQLLAB_ASYNC_TIME_LIMIT_SEC"
+]  # TODO: new config key
+
+
+@celery_app.task(name="load_chart_data_into_cache", soft_time_limit=query_timeout)
+def load_chart_data_into_cache(
+    job_metadata: Dict[str, Any], form_data: Dict[str, Any],
+) -> None:
+    from superset.charts.commands.data import (
+        ChartDataCommand,
+    )  # load here due to circular imports
+
+    with app.app_context():  # type: ignore
+        try:
+            command = ChartDataCommand()
+            command.set_query_context(form_data)
+            result = command.run(cache=True)
+            cache_key = result["cache_key"]
+            result_url = f"/api/v1/chart/data/{cache_key}"
+            async_query_manager.update_job(
+                job_metadata, async_query_manager.STATUS_DONE, result_url=result_url,
+            )
+        except Exception as exc:
+            # TODO: QueryContext should support SIP-40 style errors
+            error = exc.message if hasattr(exc, "message") else str(exc)  # type: ignore # pylint: disable=no-member
+            errors = [{"message": error}]
+            async_query_manager.update_job(
+                job_metadata, async_query_manager.STATUS_ERROR, errors=errors
+            )
+            raise exc
+
+        return None
+
+
+@celery_app.task(name="load_explore_json_into_cache", soft_time_limit=query_timeout)
+def load_explore_json_into_cache(
+    job_metadata: Dict[str, Any],
+    form_data: Dict[str, Any],
+    response_type: Optional[str] = None,
+    force: bool = False,
+) -> None:
+    with app.app_context():  # type: ignore
+        try:
+            datasource_id, datasource_type = get_datasource_info(None, None, form_data)
+
+            viz_obj = get_viz(
+                datasource_type=cast(str, datasource_type),
+                datasource_id=datasource_id,
+                form_data=form_data,
+                force=force,
+            )
+            # run query & cache results
+            payload = viz_obj.get_payload()
+            if viz_obj.has_error(payload):
+                raise SupersetVizException(errors=payload["errors"])
+
+            # cache form_data for async retrieval
+            cache_value = {"form_data": form_data, "response_type": response_type}
+            cache_key = generate_cache_key(
+                cache_value, "ejr-"
+            )  # ejr: explore_json request
+            set_and_log_cache(cache_manager.cache, cache_key, cache_value)

Review comment:
       Ditto `cache_manager.data_cache`.

##########
File path: superset-frontend/src/middleware/asyncEvent.ts
##########
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Middleware, MiddlewareAPI, Dispatch } from 'redux';
+import { SupersetClient } from '@superset-ui/core';
+import { SupersetError } from 'src/components/ErrorMessage/types';
+import { isFeatureEnabled, FeatureFlag } from '../featureFlags';
+import {
+  getClientErrorObject,
+  parseErrorJson,
+} from '../utils/getClientErrorObject';
+
+export type AsyncEvent = {
+  id: string;
+  channel_id: string;
+  job_id: string;
+  user_id: string;
+  status: string;
+  errors: SupersetError[];
+  result_url: string;
+};
+
+type AsyncEventOptions = {
+  getPendingComponents: (state: any) => any[];
+  successAction: (componentId: number, componentData: any) => { type: string };
+  errorAction: (componentId: number, response: any) => { type: string };
+  processEventsCallback?: (events: AsyncEvent[]) => void; // this is currently used only for tests
+};
+
+type CachedDataResponse = {
+  componentId: number;
+  status: string;
+  data: any;
+};
+
+const initAsyncEvents = (options: AsyncEventOptions) => {
+  const POLLING_DELAY = 250;
+  const {
+    getPendingComponents,
+    successAction,
+    errorAction,
+    processEventsCallback,
+  } = options;
+
+  const middleware: Middleware = <S>(store: MiddlewareAPI<S>) => (
+    next: Dispatch<S>,
+  ) => {
+    const JOB_STATUS = {
+      PENDING: 'pending',
+      RUNNING: 'running',
+      ERROR: 'error',
+      DONE: 'done',
+    };
+    const LOCALSTORAGE_KEY = 'last_async_event_id';
+    const POLLING_URL = '/api/v1/async_event/';
+    let lastReceivedEventId: string | null;
+
+    try {
+      lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
+    } catch (err) {
+      console.warn('failed to fetch last event Id from localStorage');
+    }
+
+    const fetchEvents = async (
+      lastEventId: string | null,
+    ): Promise<AsyncEvent[]> => {
+      const url = lastEventId
+        ? `${POLLING_URL}?last_id=${lastEventId}`
+        : POLLING_URL;
+      const { json } = await SupersetClient.get({
+        endpoint: url,
+      });
+
+      return json.result;
+    };
+
+    const fetchCachedData = async (
+      asyncEvent: AsyncEvent,
+      componentId: number,
+    ): Promise<CachedDataResponse> => {
+      let status = 'success';
+      let data;
+      try {
+        const { json } = await SupersetClient.get({
+          endpoint: asyncEvent.result_url,
+        });
+        data = 'result' in json ? json.result[0] : json;
+      } catch (response) {
+        status = 'error';
+        data = await getClientErrorObject(response);
+      }
+
+      return { componentId, status, data };
+    };
+
+    const setLastId = (asyncEvent: AsyncEvent) => {
+      lastReceivedEventId = asyncEvent.id;
+      try {
+        localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string);
+      } catch (err) {
+        console.warn('Error saving event ID to localStorage', err);
+      }
+    };
+
+    const processEvents = async () => {
+      const state = store.getState();
+      const queuedComponents = getPendingComponents(state);
+      let events: AsyncEvent[] = [];
+      if (queuedComponents && queuedComponents.length) {
+        try {
+          events = await fetchEvents(lastReceivedEventId);
+          if (events && events.length) {

Review comment:
       Nit: could use an early return here to reduce indentation.

##########
File path: superset/tasks/async_queries.py
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, cast, Dict, Optional
+
+from flask import current_app
+
+from superset import app
+from superset.exceptions import SupersetVizException
+from superset.extensions import async_query_manager, cache_manager, celery_app
+from superset.utils.cache import generate_cache_key, set_and_log_cache
+from superset.views.utils import get_datasource_info, get_viz
+
+logger = logging.getLogger(__name__)
+query_timeout = current_app.config[
+    "SQLLAB_ASYNC_TIME_LIMIT_SEC"
+]  # TODO: new config key
+
+
+@celery_app.task(name="load_chart_data_into_cache", soft_time_limit=query_timeout)
+def load_chart_data_into_cache(
+    job_metadata: Dict[str, Any], form_data: Dict[str, Any],
+) -> None:
+    from superset.charts.commands.data import (
+        ChartDataCommand,
+    )  # load here due to circular imports
+
+    with app.app_context():  # type: ignore
+        try:
+            command = ChartDataCommand()
+            command.set_query_context(form_data)
+            result = command.run(cache=True)
+            cache_key = result["cache_key"]
+            result_url = f"/api/v1/chart/data/{cache_key}"
+            async_query_manager.update_job(
+                job_metadata, async_query_manager.STATUS_DONE, result_url=result_url,
+            )
+        except Exception as exc:
+            # TODO: QueryContext should support SIP-40 style errors
+            error = exc.message if hasattr(exc, "message") else str(exc)  # type: ignore # pylint: disable=no-member
+            errors = [{"message": error}]
+            async_query_manager.update_job(
+                job_metadata, async_query_manager.STATUS_ERROR, errors=errors
+            )
+            raise exc
+
+        return None
+
+
+@celery_app.task(name="load_explore_json_into_cache", soft_time_limit=query_timeout)
+def load_explore_json_into_cache(
+    job_metadata: Dict[str, Any],
+    form_data: Dict[str, Any],
+    response_type: Optional[str] = None,
+    force: bool = False,
+) -> None:
+    with app.app_context():  # type: ignore
+        try:
+            datasource_id, datasource_type = get_datasource_info(None, None, form_data)
+
+            viz_obj = get_viz(
+                datasource_type=cast(str, datasource_type),
+                datasource_id=datasource_id,
+                form_data=form_data,
+                force=force,
+            )
+            # run query & cache results
+            payload = viz_obj.get_payload()
+            if viz_obj.has_error(payload):
+                raise SupersetVizException(errors=payload["errors"])
+
+            # cache form_data for async retrieval
+            cache_value = {"form_data": form_data, "response_type": response_type}
+            cache_key = generate_cache_key(
+                cache_value, "ejr-"

Review comment:
       Make `"ejr-"` a constant?

##########
File path: superset-frontend/src/middleware/asyncEvent.ts
##########
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Middleware, MiddlewareAPI, Dispatch } from 'redux';
+import { SupersetClient } from '@superset-ui/core';
+import { SupersetError } from 'src/components/ErrorMessage/types';
+import { isFeatureEnabled, FeatureFlag } from '../featureFlags';
+import {
+  getClientErrorObject,
+  parseErrorJson,
+} from '../utils/getClientErrorObject';
+
+export type AsyncEvent = {
+  id: string;
+  channel_id: string;
+  job_id: string;
+  user_id: string;
+  status: string;
+  errors: SupersetError[];
+  result_url: string;
+};
+
+type AsyncEventOptions = {
+  getPendingComponents: (state: any) => any[];
+  successAction: (componentId: number, componentData: any) => { type: string };
+  errorAction: (componentId: number, response: any) => { type: string };
+  processEventsCallback?: (events: AsyncEvent[]) => void; // this is currently used only for tests
+};
+
+type CachedDataResponse = {
+  componentId: number;
+  status: string;
+  data: any;
+};
+
+const initAsyncEvents = (options: AsyncEventOptions) => {
+  const POLLING_DELAY = 250;
+  const {
+    getPendingComponents,
+    successAction,
+    errorAction,
+    processEventsCallback,
+  } = options;
+
+  const middleware: Middleware = <S>(store: MiddlewareAPI<S>) => (
+    next: Dispatch<S>,
+  ) => {
+    const JOB_STATUS = {
+      PENDING: 'pending',
+      RUNNING: 'running',
+      ERROR: 'error',
+      DONE: 'done',
+    };
+    const LOCALSTORAGE_KEY = 'last_async_event_id';
+    const POLLING_URL = '/api/v1/async_event/';
+    let lastReceivedEventId: string | null;
+
+    try {
+      lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
+    } catch (err) {
+      console.warn('failed to fetch last event Id from localStorage');
+    }
+
+    const fetchEvents = async (
+      lastEventId: string | null,
+    ): Promise<AsyncEvent[]> => {
+      const url = lastEventId
+        ? `${POLLING_URL}?last_id=${lastEventId}`
+        : POLLING_URL;
+      const { json } = await SupersetClient.get({
+        endpoint: url,
+      });
+
+      return json.result;
+    };

Review comment:
       Can do
   
   ```ts
   import { makeApi } from '@superset-ui/core';
   
   const fetchEvents = makeApi<{ lastEventId: string }, { result: AsyncEvent[] }>({
     method: 'GET',
     endpoint: POLLING_URL,
   })
   
   const events = fetchEvents({ lastEventId });
   ```
   
   So you get typing for free.

##########
File path: superset-frontend/src/middleware/asyncEvent.ts
##########
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Middleware, MiddlewareAPI, Dispatch } from 'redux';
+import { SupersetClient } from '@superset-ui/core';
+import { SupersetError } from 'src/components/ErrorMessage/types';
+import { isFeatureEnabled, FeatureFlag } from '../featureFlags';
+import {
+  getClientErrorObject,
+  parseErrorJson,
+} from '../utils/getClientErrorObject';
+
+export type AsyncEvent = {
+  id: string;
+  channel_id: string;
+  job_id: string;
+  user_id: string;
+  status: string;
+  errors: SupersetError[];
+  result_url: string;
+};
+
+type AsyncEventOptions = {
+  getPendingComponents: (state: any) => any[];
+  successAction: (componentId: number, componentData: any) => { type: string };
+  errorAction: (componentId: number, response: any) => { type: string };
+  processEventsCallback?: (events: AsyncEvent[]) => void; // this is currently used only for tests
+};
+
+type CachedDataResponse = {
+  componentId: number;
+  status: string;
+  data: any;
+};
+
+const initAsyncEvents = (options: AsyncEventOptions) => {
+  const POLLING_DELAY = 250;

Review comment:
       Maybe this should be a configurable value by Superset admins. 250ms might be too short for some users.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org
For additional commands, e-mail: notifications-help@superset.apache.org