You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2024/03/13 20:34:55 UTC

(superset) branch chart-telemetry updated (ce496a3646 -> e65ac7f2f0)

This is an automated email from the ASF dual-hosted git repository.

beto pushed a change to branch chart-telemetry
in repository https://gitbox.apache.org/repos/asf/superset.git


 discard ce496a3646 feat: chart telemetry
     new e65ac7f2f0 feat: chart telemetry

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ce496a3646)
            \
             N -- N -- N   refs/heads/chart-telemetry (e65ac7f2f0)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 superset/charts/data/api.py                |  2 -
 superset/common/query_context_processor.py |  3 --
 superset/extensions/telemetry.py           | 81 +++++++++++++++++-------------
 superset/utils/decorators.py               |  7 ++-
 4 files changed, 49 insertions(+), 44 deletions(-)


(superset) 01/01: feat: chart telemetry

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch chart-telemetry
in repository https://gitbox.apache.org/repos/asf/superset.git

commit e65ac7f2f095b3a5b2ad21cf6bc1be3cb2893b83
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Wed Mar 13 13:26:03 2024 -0400

    feat: chart telemetry
---
 superset/charts/data/api.py                      | 20 +++---
 superset/commands/chart/data/get_data_command.py |  4 +-
 superset/common/query_context_processor.py       |  1 +
 superset/extensions/telemetry.py                 | 79 ++++++++++++++++++++++++
 superset/models/core.py                          | 50 ++++++++-------
 superset/utils/decorators.py                     | 34 +++++++++-
 superset/views/base.py                           |  6 ++
 7 files changed, 162 insertions(+), 32 deletions(-)

diff --git a/superset/charts/data/api.py b/superset/charts/data/api.py
index 2e46eb2737..91e906a3c7 100644
--- a/superset/charts/data/api.py
+++ b/superset/charts/data/api.py
@@ -53,7 +53,7 @@ from superset.utils.core import (
     get_user_id,
     json_int_dttm_ser,
 )
-from superset.utils.decorators import logs_context
+from superset.utils.decorators import logs_context, show_telemetry
 from superset.views.base import CsvResponse, generate_download_headers, XlsxResponse
 from superset.views.base_api import statsd_metrics
 
@@ -181,6 +181,7 @@ class ChartDataRestApi(ChartRestApi):
 
     @expose("/data", methods=("POST",))
     @protect()
+    @show_telemetry
     @statsd_metrics
     @event_logger.log_this_with_context(
         action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.data",
@@ -358,7 +359,8 @@ class ChartDataRestApi(ChartRestApi):
         # This is needed for sending reports based on text charts that do the
         # post-processing of data, eg, the pivot table.
         if result_type == ChartDataResultType.POST_PROCESSED:
-            result = apply_post_process(result, form_data, datasource)
+            with g.telemetry("Post processing data"):
+                result = apply_post_process(result, form_data, datasource)
 
         if result_format in ChartDataResultFormat.table_like():
             # Verify user has permission to export file
@@ -396,11 +398,12 @@ class ChartDataRestApi(ChartRestApi):
             )
 
         if result_format == ChartDataResultFormat.JSON:
-            response_data = simplejson.dumps(
-                {"result": result["queries"]},
-                default=json_int_dttm_ser,
-                ignore_nan=True,
-            )
+            with g.telemetry("JSON encoding"):
+                response_data = simplejson.dumps(
+                    {"result": result["queries"]},
+                    default=json_int_dttm_ser,
+                    ignore_nan=True,
+                )
             resp = make_response(response_data, 200)
             resp.headers["Content-Type"] = "application/json; charset=utf-8"
             return resp
@@ -415,7 +418,8 @@ class ChartDataRestApi(ChartRestApi):
         datasource: BaseDatasource | Query | None = None,
     ) -> Response:
         try:
-            result = command.run(force_cached=force_cached)
+            with g.telemetry("Running command"):
+                result = command.run(force_cached=force_cached)
         except ChartDataCacheLoadError as exc:
             return self.response_422(message=exc.message)
         except ChartDataQueryFailedError as exc:
diff --git a/superset/commands/chart/data/get_data_command.py b/superset/commands/chart/data/get_data_command.py
index 971c343cba..edf1df14a2 100644
--- a/superset/commands/chart/data/get_data_command.py
+++ b/superset/commands/chart/data/get_data_command.py
@@ -17,6 +17,7 @@
 import logging
 from typing import Any
 
+from flask import g
 from flask_babel import gettext as _
 
 from superset.commands.base import BaseCommand
@@ -43,7 +44,8 @@ class ChartDataCommand(BaseCommand):
         force_cached = kwargs.get("force_cached", False)
         try:
             payload = self._query_context.get_payload(
-                cache_query_context=cache_query_context, force_cached=force_cached
+                cache_query_context=cache_query_context,
+                force_cached=force_cached,
             )
         except CacheLoadError as ex:
             raise ChartDataCacheLoadError(ex.message) from ex
diff --git a/superset/common/query_context_processor.py b/superset/common/query_context_processor.py
index d8b5bea4bb..6ed3df29b1 100644
--- a/superset/common/query_context_processor.py
+++ b/superset/common/query_context_processor.py
@@ -23,6 +23,7 @@ from typing import Any, ClassVar, TYPE_CHECKING, TypedDict
 
 import numpy as np
 import pandas as pd
+from flask import g
 from flask_babel import gettext as _
 from pandas import DateOffset
 
diff --git a/superset/extensions/telemetry.py b/superset/extensions/telemetry.py
new file mode 100644
index 0000000000..0eb5d78370
--- /dev/null
+++ b/superset/extensions/telemetry.py
@@ -0,0 +1,79 @@
+from __future__ import annotations
+
+import time
+from collections.abc import Iterator
+from contextlib import contextmanager
+from typing import TypedDict
+
+
+class TelemetryItem(TypedDict):
+    name: str
+    start: float
+    end: float | None
+    children: list[TelemetryItem]
+
+
+class TelemetryHandler:
+    """
+    Handler for telemetry events.
+
+    To use this, decorate an endpoint with `@show_telemetry`:
+
+        @expose("/")
+        @show_telemetry
+        def some_endpoint() -> str:
+            with g.telemetry("Computation"):
+                output = {"answer": some_computation()}
+
+            return jsonify(output)
+
+        def some_computation() -> int:
+            with g.telemetry("Crunching numbers"):
+                return magic()
+
+    The response payload will then look like this:
+
+        {
+            # original response
+            "answer": 42,
+
+            # added telemetry
+            "telemetry": [
+                {
+                    "name": "Computation",
+                    "start": 1710360466.328792,
+                    "end": 1710360472.7976031,
+                    "children": [
+                        {
+                            "name": "Crunching numbers",
+                            "start": 1710360468.401769,
+                            "end": 1710360470.532115,
+                            "children": [],
+                        },
+                    ],
+                },
+            },
+        }
+
+    """
+
+    def __init__(self) -> None:
+        self.events: list[TelemetryItem] = []
+        self.root = self.events
+
+    @contextmanager
+    def __call__(self, name: str) -> Iterator[None]:
+        event: TelemetryItem = {
+            "name": name,
+            "start": time.time(),
+            "end": None,
+            "children": [],
+        }
+        self.root.append(event)
+        previous = self.root
+        self.root = event["children"]
+        try:
+            yield
+        finally:
+            event["end"] = time.time()
+            self.root = previous
diff --git a/superset/models/core.py b/superset/models/core.py
index 71a6e9d042..d1019351ec 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -579,35 +579,41 @@ class Database(
                 )
 
         with self.get_raw_connection(schema=schema) as conn:
-            cursor = conn.cursor()
-            for sql_ in sqls[:-1]:
+            with g.telemetry("Executing query"):
+                cursor = conn.cursor()
+                for sql_ in sqls[:-1]:
+                    if mutate_after_split:
+                        sql_ = sql_query_mutator(
+                            sql_,
+                            security_manager=security_manager,
+                            database=None,
+                        )
+                    _log_query(sql_)
+                    self.db_engine_spec.execute(cursor, sql_)
+                    cursor.fetchall()
+
                 if mutate_after_split:
-                    sql_ = sql_query_mutator(
-                        sql_,
+                    last_sql = sql_query_mutator(
+                        sqls[-1],
                         security_manager=security_manager,
                         database=None,
                     )
-                _log_query(sql_)
-                self.db_engine_spec.execute(cursor, sql_)
-                cursor.fetchall()
-
-            if mutate_after_split:
-                last_sql = sql_query_mutator(
-                    sqls[-1],
-                    security_manager=security_manager,
-                    database=None,
-                )
-                _log_query(last_sql)
-                self.db_engine_spec.execute(cursor, last_sql)
-            else:
-                _log_query(sqls[-1])
-                self.db_engine_spec.execute(cursor, sqls[-1])
+                    _log_query(last_sql)
+                    self.db_engine_spec.execute(cursor, last_sql)
+                else:
+                    _log_query(sqls[-1])
+                    self.db_engine_spec.execute(cursor, sqls[-1])
+
+            with g.telemetry("Fetching data from cursor"):
+                data = self.db_engine_spec.fetch_data(cursor)
 
-            data = self.db_engine_spec.fetch_data(cursor)
             result_set = SupersetResultSet(
-                data, cursor.description, self.db_engine_spec
+                data,
+                cursor.description,
+                self.db_engine_spec,
             )
-            df = result_set.to_pandas_df()
+            with g.telemetry("Loding into dataframe"):
+                df = result_set.to_pandas_df()
             if mutator:
                 df = mutator(df)
 
diff --git a/superset/utils/decorators.py b/superset/utils/decorators.py
index 7e34b98360..be29f9a5e4 100644
--- a/superset/utils/decorators.py
+++ b/superset/utils/decorators.py
@@ -20,10 +20,11 @@ import logging
 import time
 from collections.abc import Iterator
 from contextlib import contextmanager
+from functools import wraps
 from typing import Any, Callable, TYPE_CHECKING
 from uuid import UUID
 
-from flask import current_app, g, Response
+from flask import current_app, g, jsonify, Response
 
 from superset.utils import core as utils
 from superset.utils.dates import now_as_float
@@ -210,3 +211,34 @@ def suppress_logging(
         yield
     finally:
         target_logger.setLevel(original_level)
+
+
+def show_telemetry(f: Callable[..., Any]) -> Callable[..., Any]:
+    """
+    For JSON responses, add telemetry information to the payload.
+
+    This allows us to instrument the stack, but adding timestamps at different levels,
+    eg:
+
+        with g.telemetry("Run query"):
+            data = run_query(sql)
+
+    And then we can display this information in the UI.
+    """
+
+    @wraps(f)
+    def wrapped(*args: Any, **kwargs: Any) -> Any:
+        result = f(*args, **kwargs)
+        if hasattr(result, "get_json"):
+            try:
+                json_data = result.get_json()
+            except Exception:  # pylint: disable=broad-exception-caught
+                return result
+
+            if isinstance(json_data, dict) and hasattr(g, "telemetry"):
+                json_data["telemetry"] = g.telemetry.events
+                return jsonify(json_data)
+
+        return result
+
+    return wrapped
diff --git a/superset/views/base.py b/superset/views/base.py
index c8b4862710..1402953492 100644
--- a/superset/views/base.py
+++ b/superset/views/base.py
@@ -74,6 +74,7 @@ from superset.exceptions import (
     SupersetSecurityException,
 )
 from superset.extensions import cache_manager
+from superset.extensions.telemetry import TelemetryHandler
 from superset.models.helpers import ImportExportMixin
 from superset.reports.models import ReportRecipientType
 from superset.superset_typing import FlaskResponse
@@ -725,3 +726,8 @@ def apply_http_headers(response: Response) -> Response:
         if k not in response.headers:
             response.headers[k] = v
     return response
+
+
+@superset_app.before_request
+def start_telemetry() -> None:
+    g.telemetry = TelemetryHandler()