You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/11 22:23:05 UTC

[GitHub] [airflow] dstandish opened a new pull request #19546: Wait for task creation before deleting trigger

dstandish opened a new pull request #19546:
URL: https://github.com/apache/airflow/pull/19546


   Commonly a trigger will complete and be deleted from the `TriggerRunner.triggers` dictionary attr before the the TriggerJob has had a chance to `submit_event`.  So TriggerRunner thinks it needs to create the trigger [again], which it does.
   
   One way to resolve this is to have TriggerJob mark `complete` after successful complletion of `submit_event`.
   
   Not sure if this is _the best_ way to do it though.  Just taking a hello world swing at this @andrewgodwin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish closed pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish closed pull request #19546:
URL: https://github.com/apache/airflow/pull/19546


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r765198334



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -315,6 +339,7 @@ async def cleanup_finished_triggers(self):
                         details["name"],
                     )
                     self.failed_triggers.append(trigger_id)
+                self.completed_triggers.add(trigger_id)

Review comment:
       @andrewgodwin gentle nudge here.  If you want to go with time-based expiration I have no objection and am happy to rework this.  I do recognize that my solution adds a query but if you don't mind explaining I'm having trouble seeingthe unbound growth.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r748671412



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -148,6 +149,25 @@ def load_triggers(self):
         ids = Trigger.ids_for_triggerer(self.id)
         self.runner.update_triggers(set(ids))
 
+    @provide_session
+    def purge_completed_triggers_list(self, session):

Review comment:
       it would be nice if we didn't have to query the database here but i don't think it can be avoided.
   
   i explored updating `clean_unused` to return the `ids` deleted.  then we could pass those to `purge_completed` and purge those rows.  but this won't work because `clean_unused` does not filter w.r.t. triggerer_id, so other triggerer instances could delete the rows belonging to _this_ triggerer, so `completed_triggers` would grow over time.
   
   on the bright side, we're querying a PK, so it shouldn't be that expensive.
   
   alternatively we could change `clean_unused` to take a triggerer id and delete only the rows for that triggerer,  then the above approach would probably work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r750801976



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -315,6 +339,7 @@ async def cleanup_finished_triggers(self):
                         details["name"],
                     )
                     self.failed_triggers.append(trigger_id)
+                self.completed_triggers.add(trigger_id)

Review comment:
       how do you envision it growing?  the items should only be in `completed_triggers` for basically one or two loop cycles. it goes `submit_event` (into completed) -> `Trigger.clean_unused` (purged from db) -> `purge_completed_triggers_list` (purged from completed)
   
   but maybe you're saying that in the wild the code may not behave as intended, and triggers will accumulate in the DB, and therefore they'll accumulate in the `completed_triggers` set?  but if they accumulate in the db they'll keep getting recreated and eventually that would be an issue in itself because not only would they exist as ids in a set but they would be running.
   
   using a TTL approach as you have suggested would avoid a db query though.
   
   LMK your thoughts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r748671412



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -148,6 +149,25 @@ def load_triggers(self):
         ids = Trigger.ids_for_triggerer(self.id)
         self.runner.update_triggers(set(ids))
 
+    @provide_session
+    def purge_completed_triggers_list(self, session):

Review comment:
       it would be nice if we didn't have to query the database here but i don't think it can be avoided.
   
   i explored updating `clean_unused` to return the `ids` deleted.  then we could pass those to `purge_completed` and purge those rows.  but this won't work because `clean_unused` does not filter w.r.t. triggerer_id, so other triggerer instances could delete the rows belonging to _this_ triggerer, so `completed_triggers` would grow over time.
   
   on the bright side, we're querying a PK, so it shouldn't be that expensive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r765258471



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -315,6 +339,7 @@ async def cleanup_finished_triggers(self):
                         details["name"],
                     )
                     self.failed_triggers.append(trigger_id)
+                self.completed_triggers.add(trigger_id)

Review comment:
       I guess I just don't quite trust `difference_update` quite enough, I missed it in my first read-through for "what in here cleans out the datastructure?"
   
   If you're happy there's no edge cases where it can leak entries and write a test to prove so, I think my objection here is void.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r748671412



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -148,6 +149,25 @@ def load_triggers(self):
         ids = Trigger.ids_for_triggerer(self.id)
         self.runner.update_triggers(set(ids))
 
+    @provide_session
+    def purge_completed_triggers_list(self, session):

Review comment:
       it would be nice if we didn't have to query tthe database here but i don't think it can be avoided.
   
   i explored updating `clean_unused` to return the `ids` deleted.  then we could pass those to `purge_completed` and purge those rows.  but this won't work because `clean_unused` does not filter w.r.t. triggerer_id, so other triggerer instances could delete the rows belonging to _this_ triggerer, so `completed_triggers` would grow over time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r748671412



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -148,6 +149,25 @@ def load_triggers(self):
         ids = Trigger.ids_for_triggerer(self.id)
         self.runner.update_triggers(set(ids))
 
+    @provide_session
+    def purge_completed_triggers_list(self, session):

Review comment:
       it would be nice if we didn't have to query tthe database here but i don't think it can be avoided.
   
   i explored updating `clean_unused` to return the `ids` deleted.  then we could pass those to `purge_completed` and purge those rows.  but this won't work because `clean_unused` does not filter w.r.t. triggerer_id, so other triggerer instances could delete the rows belonging to _this_ triggerer, so `completed_triggers` would grow over time.
   
   on the bright side, we're querying a PK, so it shouldn't be that expensive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r750704826



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -315,6 +339,7 @@ async def cleanup_finished_triggers(self):
                         details["name"],
                     )
                     self.failed_triggers.append(trigger_id)
+                self.completed_triggers.add(trigger_id)

Review comment:
       It appears that `completed_triggers` will grow in an unbounded fashion with this code over time - I would suggest a datastructure that is self-limiting. My initial idea is a dict with the IDs as keys and the datetime they were inserted as values, and then just remove all keys whose values are more than 5 minutes old in the first part of `purge_completed_triggers`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r777821480



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -148,6 +149,25 @@ def load_triggers(self):
         ids = Trigger.ids_for_triggerer(self.id)
         self.runner.update_triggers(set(ids))
 
+    @provide_session
+    def purge_completed_triggers_list(self, session):

Review comment:
       the reason i put `_list` there iss because without it, it could signify "purge complleted triggers _from the database_"
   
   but it's only purging  from the instance attribribute `completed_triggers`.  the actual deletion of triggers from the database is done in TriggerJob.clean_unused
   
   what do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r775669486



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -148,6 +149,25 @@ def load_triggers(self):
         ids = Trigger.ids_for_triggerer(self.id)
         self.runner.update_triggers(set(ids))
 
+    @provide_session
+    def purge_completed_triggers_list(self, session):

Review comment:
       ```suggestion
       def purge_completed_triggers(self, session):
   ```
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#issuecomment-1007006412


   superceded by #20699


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19546: Track completed triggers until deleted from database

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19546:
URL: https://github.com/apache/airflow/pull/19546#discussion_r748671412



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -148,6 +149,25 @@ def load_triggers(self):
         ids = Trigger.ids_for_triggerer(self.id)
         self.runner.update_triggers(set(ids))
 
+    @provide_session
+    def purge_completed_triggers_list(self, session):

Review comment:
       it would be nice if we didn't have to query the database here but i don't think it can be avoided.
   
   i explored updating `clean_unused` to return the `ids` deleted.  then we could pass those to `purge_completed` and purge those rows.  but this won't work because `clean_unused` does not filter w.r.t. triggerer_id, so other triggerer instances could delete the rows belonging to _this_ triggerer, so `completed_triggers` would grow over time.
   
   on the bright side, we're querying a PK, so it shouldn't be that expensive.
   
   alternatively we could change `clean_unused` to take a triggerer id and delete only the rows for that triggerer.  this would probably work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org