You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/22 14:37:27 UTC

[airflow] 21/28: Rename `to_delete` to `to_cancel` in TriggerRunner (#20658)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3a60d1aeac1b3ebe12d95c01d5f2f27ee0993644
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Wed Jan 5 12:12:27 2022 -0800

    Rename `to_delete` to `to_cancel` in TriggerRunner (#20658)
    
    The queue's purpose is to track triggers that need to be canceled. The language `to_delete` was a bit confusing because for one it does not actually delete them but cancel them.  The deletion work is actually in `cleanup_finished_triggers`.  It seems that this method will usually not do anything and it's only for cancelling triggers that are currently running but for whatever reason no longer should be.  E.g. when a task is killed and therefore the trigger is no longer needed, or some [...]
    
    (cherry picked from commit c20ad79b40ea2b213f6dca221221c6dbd55bd08f)
---
 airflow/jobs/triggerer_job.py | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index a47bc3b..25a4c79 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -205,7 +205,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
     to_create: Deque[Tuple[int, BaseTrigger]]
 
     # Inbound queue of deleted triggers
-    to_delete: Deque[int]
+    to_cancel: Deque[int]
 
     # Outbound queue of events
     events: Deque[Tuple[int, TriggerEvent]]
@@ -221,7 +221,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         self.triggers = {}
         self.trigger_cache = {}
         self.to_create = deque()
-        self.to_delete = deque()
+        self.to_cancel = deque()
         self.events = deque()
         self.failed_triggers = deque()
 
@@ -242,7 +242,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         while not self.stop:
             # Run core logic
             await self.create_triggers()
-            await self.delete_triggers()
+            await self.cancel_triggers()
             await self.cleanup_finished_triggers()
             # Sleep for a bit
             await asyncio.sleep(1)
@@ -270,13 +270,13 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 self.log.warning("Trigger %s had insertion attempted twice", trigger_id)
             await asyncio.sleep(0)
 
-    async def delete_triggers(self):
+    async def cancel_triggers(self):
         """
-        Drain the to_delete queue and ensure all triggers that are not in the
+        Drain the to_cancel queue and ensure all triggers that are not in the
         DB are cancelled, so the cleanup job deletes them.
         """
-        while self.to_delete:
-            trigger_id = self.to_delete.popleft()
+        while self.to_cancel:
+            trigger_id = self.to_cancel.popleft()
             if trigger_id in self.triggers:
                 # We only delete if it did not exit already
                 self.triggers[trigger_id]["task"].cancel()
@@ -384,7 +384,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         current_trigger_ids = set(self.triggers.keys())
         # Work out the two difference sets
         new_trigger_ids = requested_trigger_ids.difference(current_trigger_ids)
-        old_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
+        cancel_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
         # Bulk-fetch new trigger records
         new_triggers = Trigger.bulk_fetch(new_trigger_ids)
         # Add in new triggers
@@ -401,9 +401,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 self.failed_triggers.append(new_id)
                 continue
             self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
-        # Remove old triggers
-        for old_id in old_trigger_ids:
-            self.to_delete.append(old_id)
+        # Enqueue orphaned triggers for cancellation
+        for old_id in cancel_trigger_ids:
+            self.to_cancel.append(old_id)
 
     def get_trigger_by_classpath(self, classpath: str) -> Type[BaseTrigger]:
         """