You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/03 11:32:11 UTC

[GitHub] [airflow] potiuk commented on pull request #27758: Enable trigger logging in webserver

potiuk commented on PR #27758:
URL: https://github.com/apache/airflow/pull/27758#issuecomment-1369663970

   Just for the record here as well (and maybe comments from others). 
   
   I have a proposal how we can improve the proposal from that PR to make it more scalable in case we find we need it. 
   It might be a premature optimisation to do it now, so [I do not block the current approach at all](https://github.com/apache/airflow/pull/27758#discussion_r1060424069). 
   
   But if we find that the current propsal has a significant limitations, we can adjust the current proposal even now I think. It will maybe take a bit more time to implement but it's scalability factor is far better IMHO. 
   
   ## Basic assumption
   
   * all of the deferrred tasks in Triggerer are executing as very short tasks executed in asyncio loop. Usually they are mostly doing nothing (i.e. loop is handling other tasks or doing nothing) but they are executing in triggerer, they do something for maybe a 100 ms at a time max. 
   * some of those deffered tasks might write several log lines during those 100ms or so in a quick succession. Sometimes this will be a single log entry ("waiting") but sometimes they want to write several messages for more details. Both cases are expected
   * due to overhead connected with opening file/socket and the fact that we do not possibly want to keep those files/sockets opened all the time, this means that the "ideal" way of handling logs (focusing of file but socket is the same):
   
   1) open_file()
   2) write all messages from the single "awake"
   3) close_file()
   
   Current proposal works in the way that it opens the file the first time the log is written, keeps it open through the whole lifecycle of the task and then closes the file when the task closes it (possibly - I am not 100% sure if all the closing scenarios are handled here - but certainly it can be done to close all handlers after the task is not deferred any more).
   
   ## Improvement proposal
   
   The proposal is that we:
   
   1) keep the proposal from this PR that logs from triggerer are sent to memory queue (or queues - see below) - this is needed for asyncio non-blocking behaviour
   
   2) make all handlers re-entrable  - i.e. make all handlers handle the case where you can call the set_context(), loop of emits(), close() multiple times on the same handler
   
   3) handle the complexity of opening/closing in the right way/time to the process that reads the messages from the queues). The process that reads from the queue will work in the way to accomodate the "burst" pattern. When a message is received from a specific task instance, it will wait for a while for all messages from that task instance and utilise exsistinng re-entrable handler (single handler for all task instances) to do it this way:
   
   * set_context(task_instance)
   * write all messages for that task instance()
   * close()
   
   This way the file will be opened only for the time of writing all messages from that particular task instance.
   
   This can use today's "airlfow.task" logger without any modifications (The queue reading process will just write all the messages to standard "airflow.task" logger - as long as the handler is re-eentrable. It would loook something like:
   
   ```
   logs = log_queue.pop_all_recent_messages(task_instance)
   logger  = Logging.get_logger("airflow.task")
   handler = logger.get_handler()
   handler.set_context(ti=task_instance)
   try:
     for log in logs:
        logger.log(log)
   finally:
     handler.close()
   ```
   
   The only difficulty is to make requireent that handlers should be re-entreable (in terms of being able to handle repeated set_context()/close()). I think most of our handlers are very close to be able to handle it (from looking at the code at least).
   
   
   4) we can also make it more scalable introducing multiple queues/processes handling them - reducing 1000s of deferred tasks to small N (say 10 for example) processes that handle logs send by the tasks. It should be rather easy - we coul calculate a hash of task instance and direct the messages to 1 of the N queues based on the hash - this way all the logs from the same task instance would go to the same queue. Then we can very easily multiply the number of processes to handle writing the logs to low-single-digit numbers - thus we will have at most N opened files/sockets at a time.
   
   
   I think this is not very complex and doable, I think it **might** be a premature optimisation, and we should run tests to see if the proposed approch in this PR is good-enough, but if we find that the limits are "acceptable", we can go for the Wrapper first and see if there is a need to implement a more complete/scalable solution. 
   
   I hope I explained it clearly and if there are any comments - happy to hear them. Anyway I would be really interested in seeing the limits/tests results and reviewing the wrapper code/docs to make sure we communicate any problems coming from resource limiting to the users in the way they can react to those problems on their own and configure their system properly if such problems occur.
    
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org