You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by vi...@apache.org on 2022/08/12 04:48:36 UTC
[superset] branch master updated: feat(trino): add query cancellation (#21035)
This is an automated email from the ASF dual-hosted git repository.
villebro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new 5113b01031 feat(trino): add query cancellation (#21035)
5113b01031 is described below
commit 5113b01031705128df2064068a0809f07019c8ae
Author: Ville Brofeldt <33...@users.noreply.github.com>
AuthorDate: Fri Aug 12 07:48:29 2022 +0300
feat(trino): add query cancellation (#21035)
---
superset/db_engine_specs/trino.py | 42 ++++++++++++++++++++++----
tests/unit_tests/db_engine_specs/test_trino.py | 38 +++++++++++++++++++++++
2 files changed, 74 insertions(+), 6 deletions(-)
diff --git a/superset/db_engine_specs/trino.py b/superset/db_engine_specs/trino.py
index 08c9a71f4b..2a23d1c969 100644
--- a/superset/db_engine_specs/trino.py
+++ b/superset/db_engine_specs/trino.py
@@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
import logging
from typing import Any, Dict, List, Optional, TYPE_CHECKING
@@ -90,7 +92,7 @@ class TrinoEngineSpec(PrestoEngineSpec):
@classmethod
def get_table_names(
cls,
- database: "Database",
+ database: Database,
inspector: Inspector,
schema: Optional[str],
) -> List[str]:
@@ -103,7 +105,7 @@ class TrinoEngineSpec(PrestoEngineSpec):
@classmethod
def get_view_names(
cls,
- database: "Database",
+ database: Database,
inspector: Inspector,
schema: Optional[str],
) -> List[str]:
@@ -114,7 +116,7 @@ class TrinoEngineSpec(PrestoEngineSpec):
)
@classmethod
- def get_tracking_url(cls, cursor: "Cursor") -> Optional[str]:
+ def get_tracking_url(cls, cursor: Cursor) -> Optional[str]:
try:
return cursor.info_uri
except AttributeError:
@@ -127,14 +129,42 @@ class TrinoEngineSpec(PrestoEngineSpec):
return None
@classmethod
- def handle_cursor(cls, cursor: "Cursor", query: Query, session: Session) -> None:
- """Updates progress information"""
+ def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
tracking_url = cls.get_tracking_url(cursor)
if tracking_url:
query.tracking_url = tracking_url
- session.commit()
+
+ # Adds the executed query id to the extra payload so the query can be cancelled
+ query.set_extra_json_key("cancel_query", cursor.stats["queryId"])
+
+ session.commit()
BaseEngineSpec.handle_cursor(cursor=cursor, query=query, session=session)
+ @classmethod
+ def has_implicit_cancel(cls) -> bool:
+ return False
+
+ @classmethod
+ def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
+ """
+ Cancel query in the underlying database.
+
+ :param cursor: New cursor instance to the db of the query
+ :param query: Query instance
+ :param cancel_query_id: Trino `queryId`
+ :return: True if query cancelled successfully, False otherwise
+ """
+ try:
+ cursor.execute(
+ f"CALL system.runtime.kill_query(query_id => '{cancel_query_id}',"
+ "message => 'Query cancelled by Superset')"
+ )
+ cursor.fetchall() # needed to trigger the call
+ except Exception: # pylint: disable=broad-except
+ return False
+
+ return True
+
@staticmethod
def get_extra_params(database: "Database") -> Dict[str, Any]:
"""
diff --git a/tests/unit_tests/db_engine_specs/test_trino.py b/tests/unit_tests/db_engine_specs/test_trino.py
new file mode 100644
index 0000000000..6a77e63236
--- /dev/null
+++ b/tests/unit_tests/db_engine_specs/test_trino.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=unused-argument, import-outside-toplevel, protected-access
+from unittest import mock
+
+
+@mock.patch("sqlalchemy.engine.Engine.connect")
+def test_cancel_query_success(engine_mock: mock.Mock) -> None:
+ from superset.db_engine_specs.trino import TrinoEngineSpec
+ from superset.models.sql_lab import Query
+
+ query = Query()
+ cursor_mock = engine_mock.return_value.__enter__.return_value
+ assert TrinoEngineSpec.cancel_query(cursor_mock, query, "123") is True
+
+
+@mock.patch("sqlalchemy.engine.Engine.connect")
+def test_cancel_query_failed(engine_mock: mock.Mock) -> None:
+ from superset.db_engine_specs.trino import TrinoEngineSpec
+ from superset.models.sql_lab import Query
+
+ query = Query()
+ cursor_mock = engine_mock.raiseError.side_effect = Exception()
+ assert TrinoEngineSpec.cancel_query(cursor_mock, query, "123") is False