You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@superset.apache.org by GitBox <gi...@apache.org> on 2022/08/11 14:10:38 UTC

[GitHub] [superset] ktmud commented on a diff in pull request #21035: feat(trino): add query cancellation

ktmud commented on code in PR #21035:
URL: https://github.com/apache/superset/pull/21035#discussion_r943519463


##########
superset/db_engine_specs/trino.py:
##########
@@ -14,6 +14,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from __future__ import annotations

Review Comment:
   TIL



##########
superset/db_engine_specs/trino.py:
##########
@@ -127,14 +129,42 @@ def get_tracking_url(cls, cursor: "Cursor") -> Optional[str]:
         return None
 
     @classmethod
-    def handle_cursor(cls, cursor: "Cursor", query: Query, session: Session) -> None:
-        """Updates progress information"""
+    def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
         tracking_url = cls.get_tracking_url(cursor)
         if tracking_url:
             query.tracking_url = tracking_url
-            session.commit()
+
+        # Adds the executed query id to the extra payload so the query can be cancelled
+        query.set_extra_json_key("cancel_query", cursor.stats["queryId"])
+
+        session.commit()
         BaseEngineSpec.handle_cursor(cursor=cursor, query=query, session=session)
 
+    @classmethod
+    def has_implicit_cancel(cls) -> bool:
+        return False
+
+    @classmethod
+    def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
+        """
+        Cancel query in the underlying database.
+
+        :param cursor: New cursor instance to the db of the query
+        :param query: Query instance
+        :param cancel_query_id: Trino `queryId`
+        :return: True if query cancelled successfully, False otherwise
+        """
+        try:
+            cursor.execute(
+                f"CALL system.runtime.kill_query(query_id => '{cancel_query_id}',"
+                "message => 'Query cancelled by Superset')"
+            )
+            cursor.fetchall()  # needed to trigger the call
+        except Exception:  # pylint: disable=broad-except
+            return False

Review Comment:
   Not sure how critical it is the let users know why cancellation failed but sto keep that flexibility, maybe we can return the exception message here instead? Or raise a SupersetDBAPIError here and let the downstream actions handle the exception.



##########
superset/db_engine_specs/trino.py:
##########
@@ -127,14 +129,42 @@ def get_tracking_url(cls, cursor: "Cursor") -> Optional[str]:
         return None
 
     @classmethod
-    def handle_cursor(cls, cursor: "Cursor", query: Query, session: Session) -> None:
-        """Updates progress information"""
+    def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
         tracking_url = cls.get_tracking_url(cursor)
         if tracking_url:
             query.tracking_url = tracking_url
-            session.commit()
+
+        # Adds the executed query id to the extra payload so the query can be cancelled
+        query.set_extra_json_key("cancel_query", cursor.stats["queryId"])
+
+        session.commit()
         BaseEngineSpec.handle_cursor(cursor=cursor, query=query, session=session)
 
+    @classmethod
+    def has_implicit_cancel(cls) -> bool:
+        return False

Review Comment:
   I feel some of these class methods in DBEngineSpec should probably be attributes instead. But that's another story.



##########
superset/db_engine_specs/trino.py:
##########
@@ -127,14 +129,42 @@ def get_tracking_url(cls, cursor: "Cursor") -> Optional[str]:
         return None
 
     @classmethod
-    def handle_cursor(cls, cursor: "Cursor", query: Query, session: Session) -> None:
-        """Updates progress information"""
+    def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
         tracking_url = cls.get_tracking_url(cursor)
         if tracking_url:
             query.tracking_url = tracking_url
-            session.commit()
+
+        # Adds the executed query id to the extra payload so the query can be cancelled
+        query.set_extra_json_key("cancel_query", cursor.stats["queryId"])

Review Comment:
   Can we call this `trino_query_id` instead? It's more descriptive of what it really is and opens the possibility of using the id for other purposes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org
For additional commands, e-mail: notifications-help@superset.apache.org