You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/09 22:42:26 UTC

[GitHub] [airflow] dstandish opened a new pull request, #28818: Throttle streaming log reads

dstandish opened a new pull request, #28818:
URL: https://github.com/apache/airflow/pull/28818

   When reading logs from running task, current behavior will loop as fast as possible. This causes unnecessary CPU util and loading of external resources (e.g. elasticsearch cluster). We can reduce load by sleeping a bit in each loop.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065637360


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   Well. I am not complaining about the value, but about it being "magic".
   
   I find such magic numbers often necessary for tests but in "production" code, I think they always have to have at least some justification written down.
   
   If at any point I have a magic number like that and I have no good explanation why this is the value chosen here It is always a red flag. Imagine someone else coming here a year from now trying to solve an issue with log streaming and askiong a question - why do we have 0.5 here ?. 
   
   The answer should be at least documented somewhere with reasoning - because code does not explain it.
   
   Of coudse iddeally this should be an async wait for the condition to get it out. But I understand it is  not possible. So having some "default" value here might make sense. The tradeoff is - as I understand is that we will get logs statistically with 0.25 sec delay but then the cpu usage will be much lower. 
   
   But there are at least few questions. How much is ok? What's the acceptable  delay?  Should it be different by default for different handlers? Is it needed for those other handlers as well? Or maybe we should add a possibility of specifying it differently for each handler?
   
   Maybe answer to all those question is (after applying a common-sense judgment):
   
   > The 0.5 s delay here is ok because it causes average 250ms delay on streaming the logs. but it lowers the CPU usage. While we could make it handler-dependent, we decided to hard-code it as 0.5s for all handlers for simplicity
   
   But then your judgment now might be different in the future - so for the future-self and future-others having this commented in the code would be the right thing to do I think, this way the person in the future (either yourself or someone else) will not loose mental capacity on finding out why 0.5 was chosen.
   
   Just this :).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065660545


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   Another (more concrete) question (Actually just asking the questions above in a written form sparked more thinking about it). 
   
   What happens if last few logs are coming when the thread is sleeping and the task completes? 
   
   I understand that this log reader is used in a deamon Thread in base_task_runner and I think that task runner will simply wait for the forked (or exec'ed) process and will exit immediately when it exits. Which means that it will also stop the task reader immediately (I believe daemon threads are stopped immediately when they are last remaining thread). This means that any logs  that wil be produced in that 0.5 s will not be processed by the task_reader. 
   
   The question is - do we care (I think we previously had subtle race condition here - because similar thing could have happen from time to time if the logs were written **just** before the task completed). But by adding the sleep here we are greatly increasing the probability of it happening. I think there will be tasks that will consistently have this problem.
   
   So my question is - do we care that we lost the last logs? I am not 100% sure what will be the effect in case of various handlers. 
   
   I think if we care, then we should stop using daemon thread and do a join on the reader thread just before we leave - with another magic timeout of course - but likely related to the one used in sleep).



##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   Another (more concrete) question (Actually just asking the questions above in a written form sparked more thinking about it). 
   
   What happens if last few logs are coming when the thread is sleeping and the task completes? 
   
   I understand that this log reader is used in a deamon Thread in base_task_runner and I think that task runner will simply wait for the forked (or exec'ed) process and will exit immediately when it exits. Which means that it will also stop the task reader immediately (I believe daemon threads are stopped immediately when they are last remaining thread). This means that any logs  that wil be produced in that 0.5 s will not be processed by the task_reader. 
   
   The question is - do we care (I think we previously had subtle race condition here - because similar thing could have happened from time to time if the logs were written **just** before the task completed). But by adding the sleep here we are greatly increasing the probability of it happening. I think there will be tasks that will consistently have this problem.
   
   So my question is - do we care that we lost the last logs? I am not 100% sure what will be the effect in case of various handlers. 
   
   I think if we care, then we should stop using daemon thread and do a join on the reader thread just before we leave - with another magic timeout of course - but likely related to the one used in sleep).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065660545


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   Another (more concrete) question (Actually just asking the questions above in a written form sparked more thinking about it). 
   
   What happens if last few logs are coming when the thread is sleeping and the task completes? 
   
   I understand that this log reader is used in a deamon Thread in base_task_runner and I think that task runner will simply wait for the forked (or exec'ed) process and will exit immediately when it exits. Which means that it will also stop the task reader immediately (I believe daemon threads are stopped immediately when they are last remaining thread). This means that any logs  that wil be produced in that 0.5 s will not be processed by the task_reader. 
   
   The question is - do we care (I think we previously had subtle race condition here - because similar thing could have happened from time to time if the logs were written **just** before the task completed). But by adding the sleep here we are greatly increasing the probability of it happening. I think there will be tasks that will consistently have this problem (those that are doing `log.info("Exiting")` followed by `sys.exit(0)` immediately.
   
   So my question is - do we care that we lost the last logs? I am not 100% sure what will be the effect in case of various handlers. 
   
   I think if we care, then we should stop using daemon thread and do a join on the reader thread just before we leave - with another magic timeout of course - but likely related to the one used in sleep).



##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   Another (more concrete) question (Actually just asking the questions above in a written form sparked more thinking about it). 
   
   What happens if last few logs are coming when the thread is sleeping and the task completes? 
   
   I understand that this log reader is used in a deamon Thread in base_task_runner and I think that task runner will simply wait for the forked (or exec'ed) process and will exit immediately when it exits. Which means that it will also stop the task reader immediately (I believe daemon threads are stopped immediately when they are last remaining thread). This means that any logs  that wil be produced in that 0.5 s will not be processed by the task_reader. 
   
   The question is - do we care (I think we previously had subtle race condition here - because similar thing could have happened from time to time if the logs were written **just** before the task completed). But by adding the sleep here we are greatly increasing the probability of it happening. I think there will be tasks that will consistently have this problem (those that are doing `log.info("Exiting")` followed by `sys.exit(0)` immediately).
   
   So my question is - do we care that we lost the last logs? I am not 100% sure what will be the effect in case of various handlers. 
   
   I think if we care, then we should stop using daemon thread and do a join on the reader thread just before we leave - with another magic timeout of course - but likely related to the one used in sleep).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065181871


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   looks like a magic number ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065184472


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   Completely as arbitrary 



##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   Completely  arbitrary 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065181546


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   why 0.5 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr merged pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
uranusjr merged PR #28818:
URL: https://github.com/apache/airflow/pull/28818


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1066219687


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   yeah looking again, i think better would be to do something more akin to exponential backoff... i.e. if in two consecutive loops there are no logs, then sleep 0.5. otherwise let it run.
   
   previously i had this only sleep while waiting before getting _any_ logs.  i.e. when offset stays at 0. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065660545


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   Another (more concrete) question (Actually just asking the questions above in a written form sparked more thinking about it). 
   
   What happens if last few logs are coming when the thread is sleeping and the task completes? 
   
   I understand that this log reader is used in a deamon Thread in base_task_runner and I think that task runner will simply wait for the forked (or exec'ed) process and will exit immediately when it exits. Which means that it will also stop the task reader immediately (I beliebe daemon threads are stopped immediately when they are last remaining thread). This means that any logs  that wil be produced in that 0.5 s will not be processed by the task_reader. 
   
   The question is - do we care (we previously had subtle race condition here - because similar thing could have happen from time to time if the logs were written **just** before the task completed. But by adding the sleep here we are greatly increasing the probability of it happening. I think there will be tasks that will consistently have this problem.
   
   So my question is - do we care that we lost the last logs? I am not 100% sure what will be the effect in case of various handlers. 
   
   I think if we care, then we should stop using daemon thread and do a join on the reader thread just before we leave - with another magic timeout of course - but likely related to the one used in sleep).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1066207290


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   hmm so the reading part has nothing to do with the task runner. this reading code is only invoked in the webserver. for sure it's a confusing aspect of the way this code has evolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065185176


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   You prefer one second, perhaps?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1066213230


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   yes, i should add a comment for sure that's a good point.
   separately, do you think we should treat other handlers differently?  generally they are gonna be network handlers i think. if they are just FTH it's about unnecessary CPU only.  but for remote (probably most common) its also about external services request throttling.  i the logs reader has a timeout of 5 minutes so if user leaves logs tab open when say the task is not emitting logs for that time, it will keep hitting ES as fast as it can. suppose there are many users doing at same time?  admittedly probably not likely to cause serious problem IRL. but i think i do remember a report on slack about cluster load and i imagine this could be part of it.
   it does not require long sleeping tasks either -- every time it's waiting for new logs, it's looping. so it seems like a reasonable thing to do but sincerely interested in what you think (apart from the mechanics of making this mergeable)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28818:
URL: https://github.com/apache/airflow/pull/28818#issuecomment-1377894023

   I think there were some weird network failures (on top of inventory files changed by Google). So it needs rebase :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065660545


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   Another (more concrete) question (Actually just asking the questions above in a written form sparked more thinking about it). 
   
   What happens if last few logs are coming when the thread is sleeping and the task completes? 
   
   I understand that this log reader is used in a deamon Thread in base_task_runner and I think that task runner will simply wait for the forked (or exec'ed) process and will exit immediately when it exits. Which means that it will also stop the task reader immediately (I believe daemon threads are stopped immediately when they are last remaining thread). This means that any logs  that wil be produced in that 0.5 s will not be processed by the task_reader. 
   
   The question is - do we care (we previously had subtle race condition here - because similar thing could have happen from time to time if the logs were written **just** before the task completed. But by adding the sleep here we are greatly increasing the probability of it happening. I think there will be tasks that will consistently have this problem.
   
   So my question is - do we care that we lost the last logs? I am not 100% sure what will be the effect in case of various handlers. 
   
   I think if we care, then we should stop using daemon thread and do a join on the reader thread just before we leave - with another magic timeout of course - but likely related to the one used in sleep).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1066243910


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   ok, implemented more selective throttling, added comments, made it overrideable through airflow local settings bby using constant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28818: Throttle streaming log reads

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1066207290


##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    time.sleep(0.5)

Review Comment:
   hmm so the reading part has nothing to do with the task runner. this reading code is only invoked in the webserver. for sure it's a confusing aspect of the way this code has evolved, that the webserver "log reader" logic lives on the "task logging handler" class, for convenience.  it's more than a logging handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org