You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/06 14:19:08 UTC
[GitHub] [airflow] yamrzou commented on issue #12136: KubernetesPodOperator breaks with active log-collection for long running tasks
yamrzou commented on issue #12136:
URL: https://github.com/apache/airflow/issues/12136#issuecomment-755324746
Hello @dmateusp ,
Thank you for investigating and for sharing your fix!
Regarding the reproducibility of the issue, I noticed that I can not reproduce it when using the following `cmd` and `args`:
```
cmds=["bash", "-c"],
arguments=['sleep 60']
Or
arguments=['echo hello && sleep 60']
```
But I systematically reproduce it with:
```
cmds=["bash", "-c"],
arguments=['python -c "print(\\"hello\\")" && sleep 60']
```
I tested your proposed solution and it worked. So until a PR and a new version is released, I'm monkey-patching the `PodLauncher` class within the dag code as follows:
```
from airflow.kubernetes.pod_launcher import PodLauncher
from urllib3.exceptions import ProtocolError
def patch_read_pod_logs(func):
def wrapper(self, *args, **kwargs):
logs = func(self, *args, **kwargs)
try:
for line in logs:
yield line
except ProtocolError as pe:
_, protocol_exception = pe.args
# When no logs are fetched, an IncompleteRead is thrown trying
# to decode the stream
if str(protocol_exception) == "IncompleteRead(0 bytes read)":
self.log.info(
"The pod has not logged since the logs were last fetched"
)
else:
# If the exception is not about an empty stream we raise it
raise pe
return wrapper
PodLauncher.read_pod_logs = patch_read_pod_logs(PodLauncher.read_pod_logs)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org