You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/11/05 00:15:43 UTC

[GitHub] [airflow] kazanzhy opened a new pull request, #27514: Implement on_kill() for SQLExecuteQueryOperator

kazanzhy opened a new pull request, #27514:
URL: https://github.com/apache/airflow/pull/27514

   Discussion: https://apache-airflow.slack.com/archives/CCPRP7943/p1666718396900909
   
   closes: #27314
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016004905


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
         if cur.rowcount >= 0:
             self.log.info("Rows affected: %s", cur.rowcount)
 
+    def _update_query_ids(self, cursor) -> None:

Review Comment:
   I think this method should be private.
   It's called only by `.run()` and should use the cursor to get a queryId.
   Different connectors do it in different ways. Like for the Snowflake `cursor.sfqid` and for the Presto `cursor.stats["queryId"]`.
   Theoretically, we could implement this method for Postgres like
   ```
   def _update_query_ids(self, cursor) -> None:
       cursor.execute("select pg_backend_pid()")
       self.query_ids.append(cursor.fetchone()[0][0])
   ```
   and then
   ```
   def kill_query(self, query_id) -> Any:
       result = super().run(sql=f"SELECT pg_terminate_backend({query_id});", handler=list)
       return result
   ```
   Of course, that's a bad example because after new execution we lose the returning data of the previous execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1317720914

   I'm a little skeptical about `Merge SnowflakeHook.run and TrinoHook.run to DbApiHook.run` simply because there isn't clear utility apart from maybe consolidating shared code between two hooks.  But we don't add to the base hook every time two subclasses share any code.  And it's unclear to me anyway that this particular feature is actually useful / a good idea. But if you think there's a valuable interface to add, nothing wrong with opening the PR.  Might make sense to do new PR for it though and just close this one.  Then we can examine the interface from the simple perspective of... is this a feature we want to add to DbApiHook.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016134815


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   OK so, unfortunately it appears this solution just won't work.  The on_kill won't actually kill any running query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016014971


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   Seems it's synchronous for Trino
   https://github.com/trinodb/trino-python-client/blob/b008848101d5eb915c1a7778cf780c72ea61a183/trino/dbapi.py#L407
   https://github.com/trinodb/trino-python-client/blob/b008848101d5eb915c1a7778cf780c72ea61a183/trino/client.py#L757
   
   as well as for Snowflake
   https://docs.snowflake.com/en/user-guide/python-connector-example.html#performing-a-synchronous-query



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1014628068


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
 
         return output
 
+    def on_kill(self) -> None:
+        self.log.info("Stopping:", self._hook.query_ids)
+
+        results = []
+        for query_id in self._hook.query_ids.copy():
+            result = self._hook.stop_query(query_id)
+            results.append(result)
+
+        self.log.info("Termination successful:", results)

Review Comment:
   You're right. But the current solution for Trino tries to stop query anyway
   https://github.com/apache/airflow/blob/9337aa92c082db36e82eb314585591394fe8ff27/airflow/providers/trino/operators/trino.py#L62



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1311660246

   @kazanzhy do we have something else to resolve or this PR is ready for final review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1319396279

   This PR is really overloaded.
   Let's discuss https://github.com/apache/airflow/pull/27763 and https://github.com/apache/airflow/pull/27762.
   After that, we could come back here or just close this PR.
   @dstandish 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016010092


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   It's an implementation that was before.
   For Snowflake, you could it in this PR but for Trino it was deleted by me in this PR
   https://github.com/apache/airflow/commit/df00436569bb6fb79ce8c0b7ca71dddf02b854ef#diff-9a702240b5127432b36cea027e9846f74943e8c20c292f4301c182c6063db880
   
   In both cases queryId was extracted after the `execute` and before the handler was applied to the cursor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016135342


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -114,6 +114,7 @@ def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwa
         # Hook deriving from the DBApiHook to still have access to the field in it's constructor
         self.__schema = schema
         self.log_sql = log_sql
+        self.query_ids: list[str] = []

Review Comment:
   I understand, but we don't need to stick with the old naming, especially when adding to new base hook.  I think it's good to specify what kind of query ids. but in this case they aren't actually running queries so it's in some ways a moot point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1014627221


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
         if cur.rowcount >= 0:
             self.log.info("Rows affected: %s", cur.rowcount)
 
+    def _update_query_ids(self, cursor) -> None:
+        """
+        Adds query ids to list
+        :param cur: current cursor after run
+        :return:
+        """
+        return None
+
+    def stop_query(self, query_id) -> Any:

Review Comment:
   I just saw that for Athena and EMR there's a similar method `stop_query` so I decided to use that name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1014591186


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
 
         return output
 
+    def on_kill(self) -> None:
+        self.log.info("Stopping:", self._hook.query_ids)
+
+        results = []
+        for query_id in self._hook.query_ids.copy():
+            result = self._hook.stop_query(query_id)
+            results.append(result)
+
+        self.log.info("Termination successful:", results)

Review Comment:
   not necessarily.
   query may have already completed successfully in the time between so we don't really know if we succeeded in termination we know only that we requested termination.



##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
 
         return output
 
+    def on_kill(self) -> None:
+        self.log.info("Stopping:", self._hook.query_ids)

Review Comment:
   here we don't know if we will stop the query as we don't know if the hook implemented the needed function



##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
         if cur.rowcount >= 0:
             self.log.info("Rows affected: %s", cur.rowcount)
 
+    def _update_query_ids(self, cursor) -> None:
+        """
+        Adds query ids to list
+        :param cur: current cursor after run
+        :return:
+        """
+        return None
+
+    def stop_query(self, query_id) -> Any:
+        """
+        Stops query with certain identifier
+        :param query_id: identifier of the query
+        :return:
+        """
+        return None

Review Comment:
   I know we debated this on slack but we should also get some resolution here...
   should we return None or `NotImplementedError`?
   we can catch the NotImplementedError in the operator and convert it to INFO log entry in the log notifying users that their request to kill the query can not be processed due to lack of implementation.



##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
         if cur.rowcount >= 0:
             self.log.info("Rows affected: %s", cur.rowcount)
 
+    def _update_query_ids(self, cursor) -> None:
+        """
+        Adds query ids to list
+        :param cur: current cursor after run
+        :return:
+        """
+        return None
+
+    def stop_query(self, query_id) -> Any:

Review Comment:
   there is some confusing terminology. if we use the term `kill` lets stick with it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1312592946

   @dstandish I understand your concerns about this PR.
   I must say that these changes are just returning deleted code in TrinoHook (https://github.com/apache/airflow/commit/df00436569bb6fb79ce8c0b7ca71dddf02b854ef#diff-9a702240b5127432b36cea027e9846f74943e8c20c292f4301c182c6063db880) and refactor of SnowflakeHook to use `DbApiHook.run()` instead of `SnowflakeHook.run()`. 
   So as we saw now there's no sense in using `query_ids` for `on_kill()`. But we can save them for other reasons.
   
   
   I'm proposing the following steps when we will be sure that Trino `cursor.execute` for Trino is also synchronous.
   - remove `on_kill()` implementations for TrinoOperator and SQLExecuteQueryOperator
   - remove `kill_query()` implementations for TrinoHook, PrestoHook, SnowflakeHook, and DbApiHook.
   - rename this PR to `Merge SnowflakeHook.run and TrinoHook.run to DbApiHook.run` which will be the second part of #23971
   - close issue #27314 because of impossibility to implement `on_kill()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1312147528

   > @kazanzhy do we have something else to resolve or this PR is ready for final review?
   
   @eladkal why do you think we should merge this, given that it does not do what it tries to do?
   
   The queries it says are running are not actually running.  The queries it will kill in on_kill are not running.  Before we add a new interface, shouldn't we start with code that does what it's supposed to do?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1015111462


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
 
         return output
 
+    def on_kill(self) -> None:
+        self.log.info("Stopping:", self._hook.query_ids)
+
+        results = []
+        for query_id in self._hook.query_ids.copy():
+            result = self._hook.stop_query(query_id)
+            results.append(result)
+
+        self.log.info("Termination successful:", results)

Review Comment:
   I meant to consider just changing the log entry from successful to requested?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1017032245


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   > I want to avoid all providers doing:
   > 
   > class SnowflakeExecuteQueryOperator(SqlExecuteQueryOperator):
   >     def on_kill():
   >         ...
   
   it's a very reasonable goal.  at same time, i do think that we have to start from real, working code for the "particular" cases before it will be clear what makes sense in the abstract, general case.  and it seems that we may not have any sql operators that use dbapi hook which properly implement on_kill.
   
   re AsyncSqlExecuteQueryOperator it depends what you're after.
   
   this suggests deferrable operator.  you can have a synchronous sql operator which under the hood submits and polls (asynchronously).  that might be one way to allow for on_kill to function properly but there might also be other ways such as the snowflake example above.  i do think that putting async in name of operator should be determined by the macro operator behavior not the underlying implementation details -- i.e. does it just submit and exit and return query through xcom (or defer).  it's also possible to have an operator param that controls this behavior.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1020815664


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   You're right. I checked it for Snowflake.
   ![photo_2022-11-13_00-53-28](https://user-images.githubusercontent.com/10523218/201497699-ed4ed7b1-cd42-4515-acbd-51ae27349094.jpg)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1017018518


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   In snowflake you can kill query by query_id assuming you know it or by killing all queries of the session.
   
   My point for on_kill() was that if at the end each provider will need it's own on_kill() then it means that we will not have 1 class to serve them all so every provider will need to inherit from SqlExecuteQueryOperator.
   
   I want to avoid all providers doing:
   ```
   class SnowflakeExecuteQueryOperator(SqlExecuteQueryOperator):
       def on_kill():
           ...
   ```  
   
   Maybe what we really need is AsyncSqlExecuteQueryOperator ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1018498180


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   Maybe we could put `on_kill()` logic to the hook's method and just run `hook.kill_query()` in `ExecuteQueryOperator.on_kill()`? 
   @eladkal 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1015995419


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -114,6 +114,7 @@ def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwa
         # Hook deriving from the DBApiHook to still have access to the field in it's constructor
         self.__schema = schema
         self.log_sql = log_sql
+        self.query_ids: list[str] = []

Review Comment:
   This name was already used in Snowflake so I just generalized it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1014629024


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
         if cur.rowcount >= 0:
             self.log.info("Rows affected: %s", cur.rowcount)
 
+    def _update_query_ids(self, cursor) -> None:
+        """
+        Adds query ids to list
+        :param cur: current cursor after run
+        :return:
+        """
+        return None
+
+    def stop_query(self, query_id) -> Any:

Review Comment:
   I don't really mind about specific name just about consistency/standartization.
   There are places in the projects where names were just given without too much thought so I'm raising it whenever I can and whenever appropriate.
   It's OK also to decide to keep as is.
   
   In any case - naming while important are not the priority. We can leave that point after functionality works as expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016993969


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   I think i previously used this for snowflake and it worked:
   ```
       def on_kill(self):
           if self.cnx:
               self.cnx.execute_string(f"SELECT SYSTEM$CANCEL_ALL_QUERIES({self.cnx.session_id})")
               self.cnx.execute_string("ROLLBACK")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1015993595


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
 
         return output
 
+    def on_kill(self) -> None:
+        for query_id in self._hook.query_ids.copy():

Review Comment:
   In `.kill_query()` method the `.run()` is called which appends termination `query_id` to `self.query_ids`.
   Without copy it's like:
   ```
   lst = [1,2,3]
   for l in lst:
       lst.append(l+1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1304899709

   `.kill()` method is implemented only for Trino/Preso and Snowflake.
   Cursors of these connectors return `query_id` which is used to stop running query.
   
   For the other databases we could use something like:
   ```
   select pg_terminate_backend(pid) 
   from pg_stat_activity 
   where query = '';
   ```
   if we will save the latest queries as well as QueryId


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1015006676


##########
airflow/providers/snowflake/hooks/snowflake.py:
##########
@@ -321,6 +320,21 @@ def set_autocommit(self, conn, autocommit: Any) -> None:
     def get_autocommit(self, conn):
         return getattr(conn, "autocommit_mode", False)
 
+    @staticmethod
+    def split_sql_string(sql: str) -> list[str]:

Review Comment:
   Why this function was added?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1015222828


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   so, there's a weakness to this approach. the query_id is only added after the query runs. so on_kill will never actually kill a query that isn't already complete. or at least that's how it appears.
   
   unless it submits asynchronously, all on_kill will do is kill queries that have already completed.



##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
 
         return output
 
+    def on_kill(self) -> None:
+        for query_id in self._hook.query_ids.copy():

Review Comment:
   do we actually need to copy?



##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
 
         return output
 
+    def on_kill(self) -> None:
+        for query_id in self._hook.query_ids.copy():
+            self.log.info("Stopping query: %s", self._hook.query_ids)
+            try:
+                self._hook.kill(query_id)
+            except NotImplementedError:
+                self.log.info("Method '.kill()' is not implemented for ", self._hook.__class__.__name__)
+            except Exception as e:
+                self.log.info("The query '%s' can not be killed due to %s", self._hook.query_ids, str(e))

Review Comment:
   should be query_id:
   
   ```suggestion
                   self.log.info("The query '%s' can not be killed due to %s", query_id, str(e))
   ```
   but also, i think you don't need to wrap with str:
   
   ```suggestion
                   self.log.info("The query '%s' can not be killed due to %s", query_id, e)
   ```
   
   but also, why not just log the exception?
   
   ```suggestion
                   self.log.exception("The query '%s' could not be killed.", query_id)
   ```
   
   It would include the error and traceback.  That seems reasonable no?
   



##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
         if cur.rowcount >= 0:
             self.log.info("Rows affected: %s", cur.rowcount)
 
+    def _update_query_ids(self, cursor) -> None:

Review Comment:
   is this meant to be part of the "public" interface or not? if so, why is it "protected"



##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
         if cur.rowcount >= 0:
             self.log.info("Rows affected: %s", cur.rowcount)
 
+    def _update_query_ids(self, cursor) -> None:
+        """
+        Adds query ids to list
+        :param cur: current cursor after run
+        :return:
+        """
+        return None
+
+    def kill(self, query_id) -> Any:

Review Comment:
   maybe `cancel_query` or... at least `kill_query` would be better



##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -114,6 +114,7 @@ def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwa
         # Hook deriving from the DBApiHook to still have access to the field in it's constructor
         self.__schema = schema
         self.log_sql = log_sql
+        self.query_ids: list[str] = []

Review Comment:
   should this be `running_query_ids`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1319632242

   > This PR is really overloaded. Let's discuss #27763 and #27762. After that, we could come back here or just close this PR. @dstandish
   
   great, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1018706076


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   At this point, the query is done, so what is the utility of updating the query id list here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016994219


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   (in a custom hook / operator)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016134815


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   OK so this solution just won't work.  The on_kill won't actually kill any running query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016555110


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   Does it mean that the original implementation was flawed and we can simply drop it rather than implementing the generic solution for SQLEQO ? 
   
   That certainly loks like it to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1017049087


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   I'm OK with taking it one step at a time :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1018671191


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   Yeah lets do what we need to get this PR in... Then lets start a followup PR to perfect it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1015975103


##########
airflow/providers/snowflake/hooks/snowflake.py:
##########
@@ -321,6 +320,21 @@ def set_autocommit(self, conn, autocommit: Any) -> None:
     def get_autocommit(self, conn):
         return getattr(conn, "autocommit_mode", False)
 
+    @staticmethod
+    def split_sql_string(sql: str) -> list[str]:

Review Comment:
   For most hooks, we split queries into statements using `sqlparse` but Snowflake has its own splitter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1015996646


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
         if cur.rowcount >= 0:
             self.log.info("Rows affected: %s", cur.rowcount)
 
+    def _update_query_ids(self, cursor) -> None:
+        """
+        Adds query ids to list
+        :param cur: current cursor after run
+        :return:
+        """
+        return None
+
+    def kill(self, query_id) -> Any:

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1312170087

   > > @eladkal why do you think we should merge this, given that it does not do what it tries to do?
   > 
   > I didnt say we should merge this. I asked if its ready to review. I wasnt sure if the running queries problem was adressed.
   
   OK cool i thought it meant like... you were preparing to merge it :) 
   
   Cus I had commented after the recent updates like saying "[this is still an issue](https://github.com/apache/airflow/pull/27514#discussion_r1018706076)" but then no response and the next thing i saw was "ok anything else"?  Anyway, thank you. Looking forward to getting this right... just ... want to get it right :) 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1020537330


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   i think that we need to address this issue, where the queries aren't actually running and therefore will not be killed, before adding this interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1312164793

   > @eladkal why do you think we should merge this, given that it does not do what it tries to do?
   
   I didnt say we should merge this. I asked if its ready to review. I wasnt sure if the running queries problem was adressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1020816512


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   Do you have access to any Trino and Presto instances to check it also for these DBs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on PR #27514:
URL: https://github.com/apache/airflow/pull/27514#issuecomment-1328380960

   Can I conclude that this PR should be closed as well #27314 because of the absent possibility of canceling queries after their execution using a synchronous cursor?
   @eladkal @dstandish 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy closed pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy closed pull request #27514: Implement on_kill() for SQLExecuteQueryOperator
URL: https://github.com/apache/airflow/pull/27514


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1014628881


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
         if cur.rowcount >= 0:
             self.log.info("Rows affected: %s", cur.rowcount)
 
+    def _update_query_ids(self, cursor) -> None:
+        """
+        Adds query ids to list
+        :param cur: current cursor after run
+        :return:
+        """
+        return None
+
+    def stop_query(self, query_id) -> Any:
+        """
+        Stops query with certain identifier
+        :param query_id: identifier of the query
+        :return:
+        """
+        return None

Review Comment:
   That's a good idea in my opinion.
   Because I assume that if the query is already completed and we try to kill it, Snowflake might raise an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1015995419


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -114,6 +114,7 @@ def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwa
         # Hook deriving from the DBApiHook to still have access to the field in it's constructor
         self.__schema = schema
         self.log_sql = log_sql
+        self.query_ids: list[str] = []

Review Comment:
   This name was already used in Snowflake so I just generalized it.
   Let's rename it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016990688


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   That's how it looks to me.  But I remember @eladkal indicating that the absence of on_kill was preventing some user from migrating to that generic sql operator from trino (though i don't remember where...).  Maybe we can get that user to chime in to confirm?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1017018518


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   In snowflake you can kill query by query_id assuming you know it or by killing all queries of the session.
   
   My point for on_kill() was that if at the end each provider will need it's own on_kill() then it means that we will not have 1 class to serve them all so every provider will need to inherit from SqlExecuteQueryOperator.
   
   I want to avoid all providers doing:
   ```
   class SnowflakeExecuteQueryOperator(SqlExecuteQueryOperator):
       def on_kill():
           ...
   ```               



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016010092


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   It's an implementation that was before.
   For Snowflake, you could see it in this PR but for Trino it was deleted by me in the following PR
   https://github.com/apache/airflow/commit/df00436569bb6fb79ce8c0b7ca71dddf02b854ef#diff-9a702240b5127432b36cea027e9846f74943e8c20c292f4301c182c6063db880
   
   In both cases queryId was extracted after the `execute` and before the handler was applied to the cursor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1016010092


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
                 results = []
                 for sql_statement in sql:
                     self._run_command(cur, sql_statement, parameters)
+                    self._update_query_ids(cur)

Review Comment:
   It's an implementation that was before.
   For Snowflake, you could see it in this PR but for Trino it was deleted by me in this PR
   https://github.com/apache/airflow/commit/df00436569bb6fb79ce8c0b7ca71dddf02b854ef#diff-9a702240b5127432b36cea027e9846f74943e8c20c292f4301c182c6063db880
   
   In both cases queryId was extracted after the `execute` and before the handler was applied to the cursor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27514: Implement on_kill() for SQLExecuteQueryOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1018500217


##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -114,6 +114,7 @@ def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwa
         # Hook deriving from the DBApiHook to still have access to the field in it's constructor
         self.__schema = schema
         self.log_sql = log_sql
+        self.query_ids: list[str] = []

Review Comment:
   Okay then, renamed it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org