You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by mi...@apache.org on 2023/10/31 14:35:32 UTC

(superset) 01/11: fix(sqllab): reinstate "Force trino client async execution" (#25680)

This is an automated email from the ASF dual-hosted git repository.

michaelsmolina pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 5293f5521d795d9f97a7470b1b9bd97091a190f4
Author: Rob Moore <gi...@users.noreply.github.com>
AuthorDate: Thu Oct 19 14:38:13 2023 +0100

    fix(sqllab): reinstate "Force trino client async execution" (#25680)
---
 .../docs/databases/installing-database-drivers.mdx | 81 +++++++++++-----------
 docs/docs/frequently-asked-questions.mdx           |  2 +-
 docs/docs/installation/configuring-superset.mdx    |  4 +-
 superset/config.py                                 |  5 +-
 superset/db_engine_specs/base.py                   | 18 +++++
 superset/db_engine_specs/trino.py                  | 66 ++++++++++++++++--
 superset/sql_lab.py                                |  7 +-
 tests/unit_tests/db_engine_specs/test_trino.py     | 31 ++++++++-
 tests/unit_tests/sql_lab_test.py                   | 10 ++-
 9 files changed, 163 insertions(+), 61 deletions(-)

diff --git a/docs/docs/databases/installing-database-drivers.mdx b/docs/docs/databases/installing-database-drivers.mdx
index e4e972f064..57652db4b8 100644
--- a/docs/docs/databases/installing-database-drivers.mdx
+++ b/docs/docs/databases/installing-database-drivers.mdx
@@ -22,46 +22,47 @@ as well as the packages needed to connect to the databases you want to access th
 
 Some of the recommended packages are shown below. Please refer to [setup.py](https://github.com/apache/superset/blob/master/setup.py) for the versions that are compatible with Superset.
 
-| Database                                                  | PyPI package                                                                       | Connection String                                                                                           |
-| --------------------------------------------------------- | ---------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------- |
-| [Amazon Athena](/docs/databases/athena)                   | `pip install pyathena[pandas]` , `pip install PyAthenaJDBC`                        | `awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com/{ `        |
-| [Amazon DynamoDB](/docs/databases/dynamodb)               | `pip install pydynamodb`                                                           | `dynamodb://{access_key_id}:{secret_access_key}@dynamodb.{region_name}.amazonaws.com?connector=superset`    |
-| [Amazon Redshift](/docs/databases/redshift)               | `pip install sqlalchemy-redshift`                                                  | ` redshift+psycopg2://<userName>:<DBPassword>@<AWS End Point>:5439/<Database Name>`                         |
-| [Apache Drill](/docs/databases/drill)                     | `pip install sqlalchemy-drill`                                                     | `drill+sadrill:// For JDBC drill+jdbc://`                                                                   |
-| [Apache Druid](/docs/databases/druid)                     | `pip install pydruid`                                                              | `druid://<User>:<password>@<Host>:<Port-default-9088>/druid/v2/sql`                                         |
-| [Apache Hive](/docs/databases/hive)                       | `pip install pyhive`                                                               | `hive://hive@{hostname}:{port}/{database}`                                                                  |
-| [Apache Impala](/docs/databases/impala)                   | `pip install impyla`                                                               | `impala://{hostname}:{port}/{database}`                                                                     |
-| [Apache Kylin](/docs/databases/kylin)                     | `pip install kylinpy`                                                              | `kylin://<username>:<password>@<hostname>:<port>/<project>?<param1>=<value1>&<param2>=<value2>`             |
-| [Apache Pinot](/docs/databases/pinot)                     | `pip install pinotdb`                                                              | `pinot://BROKER:5436/query?server=http://CONTROLLER:5983/`                                                  |
-| [Apache Solr](/docs/databases/solr)                       | `pip install sqlalchemy-solr`                                                      | `solr://{username}:{password}@{hostname}:{port}/{server_path}/{collection}`                                 |
-| [Apache Spark SQL](/docs/databases/spark-sql)             | `pip install pyhive`                                                               | `hive://hive@{hostname}:{port}/{database}`                                                                  |
-| [Ascend.io](/docs/databases/ascend)                       | `pip install impyla`                                                               | `ascend://{username}:{password}@{hostname}:{port}/{database}?auth_mechanism=PLAIN;use_ssl=true`             |
-| [Azure MS SQL](/docs/databases/sql-server)                | `pip install pymssql`                                                              | `mssql+pymssql://UserName@presetSQL:TestPassword@presetSQL.database.windows.net:1433/TestSchema`            |
-| [Big Query](/docs/databases/bigquery)                     | `pip install sqlalchemy-bigquery`                                                  | `bigquery://{project_id}`                                                                                   |
-| [ClickHouse](/docs/databases/clickhouse)                  | `pip install clickhouse-connect`                                                   | `clickhousedb://{username}:{password}@{hostname}:{port}/{database}`                                         |
-| [CockroachDB](/docs/databases/cockroachdb)                | `pip install cockroachdb`                                                          | `cockroachdb://root@{hostname}:{port}/{database}?sslmode=disable`                                           |
-| [Dremio](/docs/databases/dremio)                          | `pip install sqlalchemy_dremio`                                                    | `dremio://user:pwd@host:31010/`                                                                             |
-| [Elasticsearch](/docs/databases/elasticsearch)            | `pip install elasticsearch-dbapi`                                                  | `elasticsearch+http://{user}:{password}@{host}:9200/`                                                       |
-| [Exasol](/docs/databases/exasol)                          | `pip install sqlalchemy-exasol`                                                    | `exa+pyodbc://{username}:{password}@{hostname}:{port}/my_schema?CONNECTIONLCALL=en_US.UTF-8&driver=EXAODBC` |
-| [Google Sheets](/docs/databases/google-sheets)            | `pip install shillelagh[gsheetsapi]`                                               | `gsheets://`                                                                                                |
-| [Firebolt](/docs/databases/firebolt)                      | `pip install firebolt-sqlalchemy`                                                  | `firebolt://{username}:{password}@{database} or firebolt://{username}:{password}@{database}/{engine_name}`  |
-| [Hologres](/docs/databases/hologres)                      | `pip install psycopg2`                                                             | `postgresql+psycopg2://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                             |
-| [IBM Db2](/docs/databases/ibm-db2)                        | `pip install ibm_db_sa`                                                            | `db2+ibm_db://`                                                                                             |
-| [IBM Netezza Performance Server](/docs/databases/netezza) | `pip install nzalchemy`                                                            | `netezza+nzpy://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                                    |
-| [MySQL](/docs/databases/mysql)                            | `pip install mysqlclient`                                                          | `mysql://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                                           |
-| [Oracle](/docs/databases/oracle)                          | `pip install cx_Oracle`                                                            | `oracle://`                                                                                                 |
-| [PostgreSQL](/docs/databases/postgres)                    | `pip install psycopg2`                                                             | `postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                                      |
-| [Trino](/docs/databases/trino)                            | `pip install trino`                                                                | `trino://{username}:{password}@{hostname}:{port}/{catalog}`                                                 |
-| [Presto](/docs/databases/presto)                          | `pip install pyhive`                                                               | `presto://`                                                                                                 |
-| [SAP Hana](/docs/databases/hana)                          | `pip install hdbcli sqlalchemy-hana or pip install apache-superset[hana]`          | `hana://{username}:{password}@{host}:{port}`                                                                |
-| [StarRocks](/docs/databases/starrocks)                    | `pip install starrocks`                                                            | `starrocks://<User>:<Password>@<Host>:<Port>/<Catalog>.<Database>`                                          |
-| [Snowflake](/docs/databases/snowflake)                    | `pip install snowflake-sqlalchemy`                                                 | `snowflake://{user}:{password}@{account}.{region}/{database}?role={role}&warehouse={warehouse}`             |
-| SQLite                                                    | No additional library needed                                                       | `sqlite://`                                                                                                 |
-| [SQL Server](/docs/databases/sql-server)                  | `pip install pymssql`                                                              | `mssql+pymssql://`                                                                                                  |
-| [Teradata](/docs/databases/teradata)                      | `pip install teradatasqlalchemy`                                                   | `teradatasql://{user}:{password}@{host}`                                                                    |
-| [TimescaleDB](/docs/databases/timescaledb)                | `pip install psycopg2`                                                             | `postgresql://<UserName>:<DBPassword>@<Database Host>:<Port>/<Database Name>`                               |
-| [Vertica](/docs/databases/vertica)                        | `pip install sqlalchemy-vertica-python`                                            | `vertica+vertica_python://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                          |
-| [YugabyteDB](/docs/databases/yugabytedb)                  | `pip install psycopg2`                                                             | `postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                                      |
+| Database                                                  | PyPI package                                                              | Connection String                                                                                           |
+| --------------------------------------------------------- | ------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------- |
+| [Amazon Athena](/docs/databases/athena)                   | `pip install pyathena[pandas]` , `pip install PyAthenaJDBC`               | `awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com/{ `        |
+| [Amazon DynamoDB](/docs/databases/dynamodb)               | `pip install pydynamodb`                                                  | `dynamodb://{access_key_id}:{secret_access_key}@dynamodb.{region_name}.amazonaws.com?connector=superset`    |
+| [Amazon Redshift](/docs/databases/redshift)               | `pip install sqlalchemy-redshift`                                         | ` redshift+psycopg2://<userName>:<DBPassword>@<AWS End Point>:5439/<Database Name>`                         |
+| [Apache Drill](/docs/databases/drill)                     | `pip install sqlalchemy-drill`                                            | `drill+sadrill:// For JDBC drill+jdbc://`                                                                   |
+| [Apache Druid](/docs/databases/druid)                     | `pip install pydruid`                                                     | `druid://<User>:<password>@<Host>:<Port-default-9088>/druid/v2/sql`                                         |
+| [Apache Hive](/docs/databases/hive)                       | `pip install pyhive`                                                      | `hive://hive@{hostname}:{port}/{database}`                                                                  |
+| [Apache Impala](/docs/databases/impala)                   | `pip install impyla`                                                      | `impala://{hostname}:{port}/{database}`                                                                     |
+| [Apache Kylin](/docs/databases/kylin)                     | `pip install kylinpy`                                                     | `kylin://<username>:<password>@<hostname>:<port>/<project>?<param1>=<value1>&<param2>=<value2>`             |
+| [Apache Pinot](/docs/databases/pinot)                     | `pip install pinotdb`                                                     | `pinot://BROKER:5436/query?server=http://CONTROLLER:5983/`                                                  |
+| [Apache Solr](/docs/databases/solr)                       | `pip install sqlalchemy-solr`                                             | `solr://{username}:{password}@{hostname}:{port}/{server_path}/{collection}`                                 |
+| [Apache Spark SQL](/docs/databases/spark-sql)             | `pip install pyhive`                                                      | `hive://hive@{hostname}:{port}/{database}`                                                                  |
+| [Ascend.io](/docs/databases/ascend)                       | `pip install impyla`                                                      | `ascend://{username}:{password}@{hostname}:{port}/{database}?auth_mechanism=PLAIN;use_ssl=true`             |
+| [Azure MS SQL](/docs/databases/sql-server)                | `pip install pymssql`                                                     | `mssql+pymssql://UserName@presetSQL:TestPassword@presetSQL.database.windows.net:1433/TestSchema`            |
+| [Big Query](/docs/databases/bigquery)                     | `pip install sqlalchemy-bigquery`                                         | `bigquery://{project_id}`                                                                                   |
+| [ClickHouse](/docs/databases/clickhouse)                  | `pip install clickhouse-connect`                                          | `clickhousedb://{username}:{password}@{hostname}:{port}/{database}`                                         |
+| [CockroachDB](/docs/databases/cockroachdb)                | `pip install cockroachdb`                                                 | `cockroachdb://root@{hostname}:{port}/{database}?sslmode=disable`                                           |
+| [Dremio](/docs/databases/dremio)                          | `pip install sqlalchemy_dremio`                                           | `dremio://user:pwd@host:31010/`                                                                             |
+| [Elasticsearch](/docs/databases/elasticsearch)            | `pip install elasticsearch-dbapi`                                         | `elasticsearch+http://{user}:{password}@{host}:9200/`                                                       |
+| [Exasol](/docs/databases/exasol)                          | `pip install sqlalchemy-exasol`                                           | `exa+pyodbc://{username}:{password}@{hostname}:{port}/my_schema?CONNECTIONLCALL=en_US.UTF-8&driver=EXAODBC` |
+| [Google Sheets](/docs/databases/google-sheets)            | `pip install shillelagh[gsheetsapi]`                                      | `gsheets://`                                                                                                |
+| [Firebolt](/docs/databases/firebolt)                      | `pip install firebolt-sqlalchemy`                                         | `firebolt://{username}:{password}@{database} or firebolt://{username}:{password}@{database}/{engine_name}`  |
+| [Hologres](/docs/databases/hologres)                      | `pip install psycopg2`                                                    | `postgresql+psycopg2://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                             |
+| [IBM Db2](/docs/databases/ibm-db2)                        | `pip install ibm_db_sa`                                                   | `db2+ibm_db://`                                                                                             |
+| [IBM Netezza Performance Server](/docs/databases/netezza) | `pip install nzalchemy`                                                   | `netezza+nzpy://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                                    |
+| [MySQL](/docs/databases/mysql)                            | `pip install mysqlclient`                                                 | `mysql://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                                           |
+| [Oracle](/docs/databases/oracle)                          | `pip install cx_Oracle`                                                   | `oracle://`                                                                                                 |
+| [PostgreSQL](/docs/databases/postgres)                    | `pip install psycopg2`                                                    | `postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                                      |
+| [Trino](/docs/databases/trino)                            | `pip install trino`                                                       | `trino://{username}:{password}@{hostname}:{port}/{catalog}`                                                 |
+| [Presto](/docs/databases/presto)                          | `pip install pyhive`                                                      | `presto://`                                                                                                 |
+| [SAP Hana](/docs/databases/hana)                          | `pip install hdbcli sqlalchemy-hana or pip install apache-superset[hana]` | `hana://{username}:{password}@{host}:{port}`                                                                |
+| [StarRocks](/docs/databases/starrocks)                    | `pip install starrocks`                                                   | `starrocks://<User>:<Password>@<Host>:<Port>/<Catalog>.<Database>`                                          |
+| [Snowflake](/docs/databases/snowflake)                    | `pip install snowflake-sqlalchemy`                                        | `snowflake://{user}:{password}@{account}.{region}/{database}?role={role}&warehouse={warehouse}`             |
+| SQLite                                                    | No additional library needed                                              | `sqlite://path/to/file.db?check_same_thread=false`                                                          |
+| [SQL Server](/docs/databases/sql-server)                  | `pip install pymssql`                                                     | `mssql+pymssql://`                                                                                          |
+| [Teradata](/docs/databases/teradata)                      | `pip install teradatasqlalchemy`                                          | `teradatasql://{user}:{password}@{host}`                                                                    |
+| [TimescaleDB](/docs/databases/timescaledb)                | `pip install psycopg2`                                                    | `postgresql://<UserName>:<DBPassword>@<Database Host>:<Port>/<Database Name>`                               |
+| [Vertica](/docs/databases/vertica)                        | `pip install sqlalchemy-vertica-python`                                   | `vertica+vertica_python://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                          |
+| [YugabyteDB](/docs/databases/yugabytedb)                  | `pip install psycopg2`                                                    | `postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>`                                      |
+
 ---
 
 Note that many other databases are supported, the main criteria being the existence of a functional
diff --git a/docs/docs/frequently-asked-questions.mdx b/docs/docs/frequently-asked-questions.mdx
index bbb94d617b..79a0863b08 100644
--- a/docs/docs/frequently-asked-questions.mdx
+++ b/docs/docs/frequently-asked-questions.mdx
@@ -168,7 +168,7 @@ Another workaround is to change where superset stores the sqlite database by add
 `superset_config.py`:
 
 ```
-SQLALCHEMY_DATABASE_URI = 'sqlite:////new/location/superset.db'
+SQLALCHEMY_DATABASE_URI = 'sqlite:////new/location/superset.db?check_same_thread=false'
 ```
 
 You can read more about customizing Superset using the configuration file
diff --git a/docs/docs/installation/configuring-superset.mdx b/docs/docs/installation/configuring-superset.mdx
index 9cb3aaefac..c6108d6f59 100644
--- a/docs/docs/installation/configuring-superset.mdx
+++ b/docs/docs/installation/configuring-superset.mdx
@@ -32,7 +32,9 @@ SECRET_KEY = 'YOUR_OWN_RANDOM_GENERATED_SECRET_KEY'
 # superset metadata (slices, connections, tables, dashboards, ...).
 # Note that the connection information to connect to the datasources
 # you want to explore are managed directly in the web UI
-SQLALCHEMY_DATABASE_URI = 'sqlite:////path/to/superset.db'
+# The check_same_thread=false property ensures the sqlite client does not attempt
+# to enforce single-threaded access, which may be problematic in some edge cases
+SQLALCHEMY_DATABASE_URI = 'sqlite:////path/to/superset.db?check_same_thread=false'
 
 # Flask-WTF flag for CSRF
 WTF_CSRF_ENABLED = True
diff --git a/superset/config.py b/superset/config.py
index 27f78832d1..73553fcc6c 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -186,7 +186,10 @@ SQLALCHEMY_TRACK_MODIFICATIONS = False
 SECRET_KEY = os.environ.get("SUPERSET_SECRET_KEY") or CHANGE_ME_SECRET_KEY
 
 # The SQLAlchemy connection string.
-SQLALCHEMY_DATABASE_URI = "sqlite:///" + os.path.join(DATA_DIR, "superset.db")
+SQLALCHEMY_DATABASE_URI = (
+    f"""sqlite:///{os.path.join(DATA_DIR, "superset.db")}?check_same_thread=false"""
+)
+
 # SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp'
 # SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
 
diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py
index 5836e6163f..6be3ab24b0 100644
--- a/superset/db_engine_specs/base.py
+++ b/superset/db_engine_specs/base.py
@@ -1053,6 +1053,24 @@ class BaseEngineSpec:  # pylint: disable=too-many-public-methods
         query object"""
         # TODO: Fix circular import error caused by importing sql_lab.Query
 
+    @classmethod
+    def execute_with_cursor(
+        cls, cursor: Any, sql: str, query: Query, session: Session
+    ) -> None:
+        """
+        Trigger execution of a query and handle the resulting cursor.
+
+        For most implementations this just makes calls to `execute` and
+        `handle_cursor` consecutively, but in some engines (e.g. Trino) we may
+        need to handle client limitations such as lack of async support and
+        perform a more complicated operation to get information from the cursor
+        in a timely manner and facilitate operations such as query stop
+        """
+        logger.debug("Query %d: Running query: %s", query.id, sql)
+        cls.execute(cursor, sql, async_=True)
+        logger.debug("Query %d: Handling cursor", query.id)
+        cls.handle_cursor(cursor, query, session)
+
     @classmethod
     def extract_error_message(cls, ex: Exception) -> str:
         return f"{cls.engine} error: {cls._extract_error_message(ex)}"
diff --git a/superset/db_engine_specs/trino.py b/superset/db_engine_specs/trino.py
index eff78c4fa4..f758f1fadd 100644
--- a/superset/db_engine_specs/trino.py
+++ b/superset/db_engine_specs/trino.py
@@ -17,6 +17,8 @@
 from __future__ import annotations
 
 import logging
+import threading
+import time
 from typing import Any, TYPE_CHECKING
 
 import simplejson as json
@@ -154,14 +156,21 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
 
     @classmethod
     def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
-        if tracking_url := cls.get_tracking_url(cursor):
-            query.tracking_url = tracking_url
+        """
+        Handle a trino client cursor.
+
+        WARNING: if you execute a query, it will block until complete and you
+        will not be able to handle the cursor until complete. Use
+        `execute_with_cursor` instead, to handle this asynchronously.
+        """
 
         # Adds the executed query id to the extra payload so the query can be cancelled
-        query.set_extra_json_key(
-            key=QUERY_CANCEL_KEY,
-            value=(cancel_query_id := cursor.stats["queryId"]),
-        )
+        cancel_query_id = cursor.query_id
+        logger.debug("Query %d: queryId %s found in cursor", query.id, cancel_query_id)
+        query.set_extra_json_key(key=QUERY_CANCEL_KEY, value=cancel_query_id)
+
+        if tracking_url := cls.get_tracking_url(cursor):
+            query.tracking_url = tracking_url
 
         session.commit()
 
@@ -176,6 +185,51 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
 
         super().handle_cursor(cursor=cursor, query=query, session=session)
 
+    @classmethod
+    def execute_with_cursor(
+        cls, cursor: Any, sql: str, query: Query, session: Session
+    ) -> None:
+        """
+        Trigger execution of a query and handle the resulting cursor.
+
+        Trino's client blocks until the query is complete, so we need to run it
+        in another thread and invoke `handle_cursor` to poll for the query ID
+        to appear on the cursor in parallel.
+        """
+        execute_result: dict[str, Any] = {}
+
+        def _execute(results: dict[str, Any]) -> None:
+            logger.debug("Query %d: Running query: %s", query.id, sql)
+
+            # Pass result / exception information back to the parent thread
+            try:
+                cls.execute(cursor, sql)
+                results["complete"] = True
+            except Exception as ex:  # pylint: disable=broad-except
+                results["complete"] = True
+                results["error"] = ex
+
+        execute_thread = threading.Thread(target=_execute, args=(execute_result,))
+        execute_thread.start()
+
+        # Wait for a query ID to be available before handling the cursor, as
+        # it's required by that method; it may never become available on error.
+        while not cursor.query_id and not execute_result.get("complete"):
+            time.sleep(0.1)
+
+        logger.debug("Query %d: Handling cursor", query.id)
+        cls.handle_cursor(cursor, query, session)
+
+        # Block until the query completes; same behaviour as the client itself
+        logger.debug("Query %d: Waiting for query to complete", query.id)
+        while not execute_result.get("complete"):
+            time.sleep(0.5)
+
+        # Unfortunately we'll mangle the stack trace due to the thread, but
+        # throwing the original exception allows mapping database errors as normal
+        if err := execute_result.get("error"):
+            raise err
+
     @classmethod
     def prepare_cancel_query(cls, query: Query, session: Session) -> None:
         if QUERY_CANCEL_KEY not in query.extra:
diff --git a/superset/sql_lab.py b/superset/sql_lab.py
index afc682b10f..ca157b3240 100644
--- a/superset/sql_lab.py
+++ b/superset/sql_lab.py
@@ -191,7 +191,7 @@ def get_sql_results(  # pylint: disable=too-many-arguments
                 return handle_query_error(ex, query, session)
 
 
-def execute_sql_statement(  # pylint: disable=too-many-arguments,too-many-statements
+def execute_sql_statement(  # pylint: disable=too-many-arguments
     sql_statement: str,
     query: Query,
     session: Session,
@@ -271,10 +271,7 @@ def execute_sql_statement(  # pylint: disable=too-many-arguments,too-many-statem
             )
         session.commit()
         with stats_timing("sqllab.query.time_executing_query", stats_logger):
-            logger.debug("Query %d: Running query: %s", query.id, sql)
-            db_engine_spec.execute(cursor, sql, async_=True)
-            logger.debug("Query %d: Handling cursor", query.id)
-            db_engine_spec.handle_cursor(cursor, query, session)
+            db_engine_spec.execute_with_cursor(cursor, sql, query, session)
 
         with stats_timing("sqllab.query.time_fetching_results", stats_logger):
             logger.debug(
diff --git a/tests/unit_tests/db_engine_specs/test_trino.py b/tests/unit_tests/db_engine_specs/test_trino.py
index 963953d18b..1b50a683a0 100644
--- a/tests/unit_tests/db_engine_specs/test_trino.py
+++ b/tests/unit_tests/db_engine_specs/test_trino.py
@@ -352,7 +352,7 @@ def test_handle_cursor_early_cancel(
     query_id = "myQueryId"
 
     cursor_mock = engine_mock.return_value.__enter__.return_value
-    cursor_mock.stats = {"queryId": query_id}
+    cursor_mock.query_id = query_id
     session_mock = mocker.MagicMock()
 
     query = Query()
@@ -366,3 +366,32 @@ def test_handle_cursor_early_cancel(
         assert cancel_query_mock.call_args[1]["cancel_query_id"] == query_id
     else:
         assert cancel_query_mock.call_args is None
+
+
+def test_execute_with_cursor_in_parallel(mocker: MockerFixture):
+    """Test that `execute_with_cursor` fetches query ID from the cursor"""
+    from superset.db_engine_specs.trino import TrinoEngineSpec
+
+    query_id = "myQueryId"
+
+    mock_cursor = mocker.MagicMock()
+    mock_cursor.query_id = None
+
+    mock_query = mocker.MagicMock()
+    mock_session = mocker.MagicMock()
+
+    def _mock_execute(*args, **kwargs):
+        mock_cursor.query_id = query_id
+
+    mock_cursor.execute.side_effect = _mock_execute
+
+    TrinoEngineSpec.execute_with_cursor(
+        cursor=mock_cursor,
+        sql="SELECT 1 FROM foo",
+        query=mock_query,
+        session=mock_session,
+    )
+
+    mock_query.set_extra_json_key.assert_called_once_with(
+        key=QUERY_CANCEL_KEY, value=query_id
+    )
diff --git a/tests/unit_tests/sql_lab_test.py b/tests/unit_tests/sql_lab_test.py
index 29f45eab68..edc1fd2ec4 100644
--- a/tests/unit_tests/sql_lab_test.py
+++ b/tests/unit_tests/sql_lab_test.py
@@ -55,8 +55,8 @@ def test_execute_sql_statement(mocker: MockerFixture, app: None) -> None:
     )
 
     database.apply_limit_to_sql.assert_called_with("SELECT 42 AS answer", 2, force=True)
-    db_engine_spec.execute.assert_called_with(
-        cursor, "SELECT 42 AS answer LIMIT 2", async_=True
+    db_engine_spec.execute_with_cursor.assert_called_with(
+        cursor, "SELECT 42 AS answer LIMIT 2", query, session
     )
     SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)
 
@@ -106,10 +106,8 @@ def test_execute_sql_statement_with_rls(
         101,
         force=True,
     )
-    db_engine_spec.execute.assert_called_with(
-        cursor,
-        "SELECT * FROM sales WHERE organization_id=42 LIMIT 101",
-        async_=True,
+    db_engine_spec.execute_with_cursor.assert_called_with(
+        cursor, "SELECT * FROM sales WHERE organization_id=42 LIMIT 101", query, session
     )
     SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)