You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2024/03/13 18:10:59 UTC

(superset) branch chart-telemetry updated (7c47b51928 -> ce496a3646)

This is an automated email from the ASF dual-hosted git repository.

beto pushed a change to branch chart-telemetry
in repository https://gitbox.apache.org/repos/asf/superset.git


 discard 7c47b51928 feat: chart telemetry
     new ce496a3646 feat: chart telemetry

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7c47b51928)
            \
             N -- N -- N   refs/heads/chart-telemetry (ce496a3646)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 superset/commands/chart/data/get_data_command.py |  3 +-
 superset/common/query_context_processor.py       |  4 ++
 superset/extensions/telemetry.py                 | 28 +++++++++----
 superset/models/core.py                          | 50 +++++++++++++-----------
 4 files changed, 54 insertions(+), 31 deletions(-)


(superset) 01/01: feat: chart telemetry

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch chart-telemetry
in repository https://gitbox.apache.org/repos/asf/superset.git

commit ce496a36463489a4a99f75dfd8196468e89a895b
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Wed Mar 13 13:26:03 2024 -0400

    feat: chart telemetry
---
 superset/charts/data/api.py                      | 22 +++++---
 superset/commands/chart/data/get_data_command.py |  4 +-
 superset/common/query_context_processor.py       |  4 ++
 superset/extensions/telemetry.py                 | 68 ++++++++++++++++++++++++
 superset/models/core.py                          | 50 +++++++++--------
 superset/utils/decorators.py                     | 35 +++++++++++-
 superset/views/base.py                           |  6 +++
 7 files changed, 157 insertions(+), 32 deletions(-)

diff --git a/superset/charts/data/api.py b/superset/charts/data/api.py
index 2e46eb2737..a7a021b97a 100644
--- a/superset/charts/data/api.py
+++ b/superset/charts/data/api.py
@@ -53,7 +53,7 @@ from superset.utils.core import (
     get_user_id,
     json_int_dttm_ser,
 )
-from superset.utils.decorators import logs_context
+from superset.utils.decorators import logs_context, show_telemetry
 from superset.views.base import CsvResponse, generate_download_headers, XlsxResponse
 from superset.views.base_api import statsd_metrics
 
@@ -181,6 +181,7 @@ class ChartDataRestApi(ChartRestApi):
 
     @expose("/data", methods=("POST",))
     @protect()
+    @show_telemetry
     @statsd_metrics
     @event_logger.log_this_with_context(
         action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.data",
@@ -225,6 +226,8 @@ class ChartDataRestApi(ChartRestApi):
             500:
               $ref: '#/components/responses/500'
         """
+        g.telemetry.add("Computing chart data")
+
         json_body = None
         if request.is_json:
             json_body = request.json
@@ -358,7 +361,8 @@ class ChartDataRestApi(ChartRestApi):
         # This is needed for sending reports based on text charts that do the
         # post-processing of data, eg, the pivot table.
         if result_type == ChartDataResultType.POST_PROCESSED:
-            result = apply_post_process(result, form_data, datasource)
+            with g.telemetry("Post processing data"):
+                result = apply_post_process(result, form_data, datasource)
 
         if result_format in ChartDataResultFormat.table_like():
             # Verify user has permission to export file
@@ -396,11 +400,12 @@ class ChartDataRestApi(ChartRestApi):
             )
 
         if result_format == ChartDataResultFormat.JSON:
-            response_data = simplejson.dumps(
-                {"result": result["queries"]},
-                default=json_int_dttm_ser,
-                ignore_nan=True,
-            )
+            with g.telemetry("JSON encoding"):
+                response_data = simplejson.dumps(
+                    {"result": result["queries"]},
+                    default=json_int_dttm_ser,
+                    ignore_nan=True,
+                )
             resp = make_response(response_data, 200)
             resp.headers["Content-Type"] = "application/json; charset=utf-8"
             return resp
@@ -415,7 +420,8 @@ class ChartDataRestApi(ChartRestApi):
         datasource: BaseDatasource | Query | None = None,
     ) -> Response:
         try:
-            result = command.run(force_cached=force_cached)
+            with g.telemetry("Running command"):
+                result = command.run(force_cached=force_cached)
         except ChartDataCacheLoadError as exc:
             return self.response_422(message=exc.message)
         except ChartDataQueryFailedError as exc:
diff --git a/superset/commands/chart/data/get_data_command.py b/superset/commands/chart/data/get_data_command.py
index 971c343cba..edf1df14a2 100644
--- a/superset/commands/chart/data/get_data_command.py
+++ b/superset/commands/chart/data/get_data_command.py
@@ -17,6 +17,7 @@
 import logging
 from typing import Any
 
+from flask import g
 from flask_babel import gettext as _
 
 from superset.commands.base import BaseCommand
@@ -43,7 +44,8 @@ class ChartDataCommand(BaseCommand):
         force_cached = kwargs.get("force_cached", False)
         try:
             payload = self._query_context.get_payload(
-                cache_query_context=cache_query_context, force_cached=force_cached
+                cache_query_context=cache_query_context,
+                force_cached=force_cached,
             )
         except CacheLoadError as ex:
             raise ChartDataCacheLoadError(ex.message) from ex
diff --git a/superset/common/query_context_processor.py b/superset/common/query_context_processor.py
index d8b5bea4bb..543de5c740 100644
--- a/superset/common/query_context_processor.py
+++ b/superset/common/query_context_processor.py
@@ -23,6 +23,7 @@ from typing import Any, ClassVar, TYPE_CHECKING, TypedDict
 
 import numpy as np
 import pandas as pd
+from flask import g
 from flask_babel import gettext as _
 from pandas import DateOffset
 
@@ -168,6 +169,9 @@ class QueryContextProcessor:
                 cache.error_message = str(ex)
                 cache.status = QueryStatus.FAILED
 
+        else:
+            g.telemetry.add("Hit cache")
+
         # the N-dimensional DataFrame has converted into flat DataFrame
         # by `flatten operator`, "comma" in the column is escaped by `escape_separator`
         # the result DataFrame columns should be unescaped
diff --git a/superset/extensions/telemetry.py b/superset/extensions/telemetry.py
new file mode 100644
index 0000000000..aa45ff5671
--- /dev/null
+++ b/superset/extensions/telemetry.py
@@ -0,0 +1,68 @@
+import time
+from collections.abc import Iterator
+from contextlib import contextmanager
+
+
+class TelemetryHandler:
+    """
+    Handler for telemetry events.
+
+    To use this, decorate an endpoint with `@show_telemetry`:
+
+        @expose("/")
+        @show_telemetry
+        def some_endpoint() -> str:
+            g.telemetry.add("Processing request")
+
+            with g.telemetry("Computation"):
+                output = {"answer": some_computation()}
+
+            return jsonify(output)
+
+    The response payload will then look like this:
+
+        {
+            "answer": 42,
+            "telemetry": {
+                1710345892.8975344: "Processing request",
+                1710345893.4794712: "Computation START",
+                1710345900.3592598: "Computation END",
+            },
+        }
+
+    """
+
+    def __init__(self) -> None:
+        self.events: list[tuple[str, float]] = []
+
+    @contextmanager
+    def __call__(self, event: str) -> Iterator[None]:
+        """
+        Context manager for start/end events.
+
+            with g.telemetry("Run query"):
+                run_query()
+
+        Will produce the events "Run query START" and "Run query END". In the context
+        manager block raises an exception, "Run query ERROR" will be produced instead of
+        the latter.
+        """
+        self.add(f"{event} START")
+        try:
+            yield
+            self.add(f"{event} END")
+        except Exception as ex:
+            self.add(f"{event} ERROR")
+            raise ex
+
+    def add(self, event: str) -> None:
+        """
+        Add a single event.
+        """
+        self.events.append((event, time.time()))
+
+    def to_dict(self) -> dict[float, str]:
+        """
+        Convert to a dictionary.
+        """
+        return {event_time: event_name for event_name, event_time in self.events}  #
diff --git a/superset/models/core.py b/superset/models/core.py
index 71a6e9d042..d1019351ec 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -579,35 +579,41 @@ class Database(
                 )
 
         with self.get_raw_connection(schema=schema) as conn:
-            cursor = conn.cursor()
-            for sql_ in sqls[:-1]:
+            with g.telemetry("Executing query"):
+                cursor = conn.cursor()
+                for sql_ in sqls[:-1]:
+                    if mutate_after_split:
+                        sql_ = sql_query_mutator(
+                            sql_,
+                            security_manager=security_manager,
+                            database=None,
+                        )
+                    _log_query(sql_)
+                    self.db_engine_spec.execute(cursor, sql_)
+                    cursor.fetchall()
+
                 if mutate_after_split:
-                    sql_ = sql_query_mutator(
-                        sql_,
+                    last_sql = sql_query_mutator(
+                        sqls[-1],
                         security_manager=security_manager,
                         database=None,
                     )
-                _log_query(sql_)
-                self.db_engine_spec.execute(cursor, sql_)
-                cursor.fetchall()
-
-            if mutate_after_split:
-                last_sql = sql_query_mutator(
-                    sqls[-1],
-                    security_manager=security_manager,
-                    database=None,
-                )
-                _log_query(last_sql)
-                self.db_engine_spec.execute(cursor, last_sql)
-            else:
-                _log_query(sqls[-1])
-                self.db_engine_spec.execute(cursor, sqls[-1])
+                    _log_query(last_sql)
+                    self.db_engine_spec.execute(cursor, last_sql)
+                else:
+                    _log_query(sqls[-1])
+                    self.db_engine_spec.execute(cursor, sqls[-1])
+
+            with g.telemetry("Fetching data from cursor"):
+                data = self.db_engine_spec.fetch_data(cursor)
 
-            data = self.db_engine_spec.fetch_data(cursor)
             result_set = SupersetResultSet(
-                data, cursor.description, self.db_engine_spec
+                data,
+                cursor.description,
+                self.db_engine_spec,
             )
-            df = result_set.to_pandas_df()
+            with g.telemetry("Loding into dataframe"):
+                df = result_set.to_pandas_df()
             if mutator:
                 df = mutator(df)
 
diff --git a/superset/utils/decorators.py b/superset/utils/decorators.py
index 7e34b98360..fdc764ba36 100644
--- a/superset/utils/decorators.py
+++ b/superset/utils/decorators.py
@@ -20,10 +20,11 @@ import logging
 import time
 from collections.abc import Iterator
 from contextlib import contextmanager
+from functools import wraps
 from typing import Any, Callable, TYPE_CHECKING
 from uuid import UUID
 
-from flask import current_app, g, Response
+from flask import current_app, g, jsonify, Response
 
 from superset.utils import core as utils
 from superset.utils.dates import now_as_float
@@ -210,3 +211,35 @@ def suppress_logging(
         yield
     finally:
         target_logger.setLevel(original_level)
+
+
+def show_telemetry(f: Callable[..., Any]) -> Callable[..., Any]:
+    """
+    For JSON responses, add telemetry information to the payload.
+
+    This allows us to instrument the stack, but adding timestamps at different levels,
+    eg:
+
+        g.telemetry.add("START_RUN_QUERY")
+        data = run_query(sql)
+        g.telemetry.add("END_RUN_QUERY")
+
+    And then we can display this information in the UI.
+    """
+
+    @wraps(f)
+    def wrapped(*args: Any, **kwargs: Any) -> Any:
+        result = f(*args, **kwargs)
+        if hasattr(result, "get_json"):
+            try:
+                json_data = result.get_json()
+            except Exception:  # pylint: disable=broad-exception-caught
+                return result
+
+            if isinstance(json_data, dict) and hasattr(g, "telemetry"):
+                json_data["telemetry"] = g.telemetry.to_dict()
+                return jsonify(json_data)
+
+        return result
+
+    return wrapped
diff --git a/superset/views/base.py b/superset/views/base.py
index c8b4862710..1402953492 100644
--- a/superset/views/base.py
+++ b/superset/views/base.py
@@ -74,6 +74,7 @@ from superset.exceptions import (
     SupersetSecurityException,
 )
 from superset.extensions import cache_manager
+from superset.extensions.telemetry import TelemetryHandler
 from superset.models.helpers import ImportExportMixin
 from superset.reports.models import ReportRecipientType
 from superset.superset_typing import FlaskResponse
@@ -725,3 +726,8 @@ def apply_http_headers(response: Response) -> Response:
         if k not in response.headers:
             response.headers[k] = v
     return response
+
+
+@superset_app.before_request
+def start_telemetry() -> None:
+    g.telemetry = TelemetryHandler()