You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/22 17:51:36 UTC
[airflow] 21/30: Rename `to_delete` to `to_cancel` in TriggerRunner (#20658)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e3d602f7d6eee6e96efb0f27fb2f83d6a424681a
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Wed Jan 5 12:12:27 2022 -0800
Rename `to_delete` to `to_cancel` in TriggerRunner (#20658)
The queue's purpose is to track triggers that need to be canceled. The language `to_delete` was a bit confusing because for one it does not actually delete them but cancel them. The deletion work is actually in `cleanup_finished_triggers`. It seems that this method will usually not do anything and it's only for cancelling triggers that are currently running but for whatever reason no longer should be. E.g. when a task is killed and therefore the trigger is no longer needed, or some [...]
(cherry picked from commit c20ad79b40ea2b213f6dca221221c6dbd55bd08f)
---
airflow/jobs/triggerer_job.py | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index a47bc3b..25a4c79 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -205,7 +205,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
to_create: Deque[Tuple[int, BaseTrigger]]
# Inbound queue of deleted triggers
- to_delete: Deque[int]
+ to_cancel: Deque[int]
# Outbound queue of events
events: Deque[Tuple[int, TriggerEvent]]
@@ -221,7 +221,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
self.triggers = {}
self.trigger_cache = {}
self.to_create = deque()
- self.to_delete = deque()
+ self.to_cancel = deque()
self.events = deque()
self.failed_triggers = deque()
@@ -242,7 +242,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
while not self.stop:
# Run core logic
await self.create_triggers()
- await self.delete_triggers()
+ await self.cancel_triggers()
await self.cleanup_finished_triggers()
# Sleep for a bit
await asyncio.sleep(1)
@@ -270,13 +270,13 @@ class TriggerRunner(threading.Thread, LoggingMixin):
self.log.warning("Trigger %s had insertion attempted twice", trigger_id)
await asyncio.sleep(0)
- async def delete_triggers(self):
+ async def cancel_triggers(self):
"""
- Drain the to_delete queue and ensure all triggers that are not in the
+ Drain the to_cancel queue and ensure all triggers that are not in the
DB are cancelled, so the cleanup job deletes them.
"""
- while self.to_delete:
- trigger_id = self.to_delete.popleft()
+ while self.to_cancel:
+ trigger_id = self.to_cancel.popleft()
if trigger_id in self.triggers:
# We only delete if it did not exit already
self.triggers[trigger_id]["task"].cancel()
@@ -384,7 +384,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
current_trigger_ids = set(self.triggers.keys())
# Work out the two difference sets
new_trigger_ids = requested_trigger_ids.difference(current_trigger_ids)
- old_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
+ cancel_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
# Bulk-fetch new trigger records
new_triggers = Trigger.bulk_fetch(new_trigger_ids)
# Add in new triggers
@@ -401,9 +401,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
self.failed_triggers.append(new_id)
continue
self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
- # Remove old triggers
- for old_id in old_trigger_ids:
- self.to_delete.append(old_id)
+ # Enqueue orphaned triggers for cancellation
+ for old_id in cancel_trigger_ids:
+ self.to_cancel.append(old_id)
def get_trigger_by_classpath(self, classpath: str) -> Type[BaseTrigger]:
"""