You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/17 10:50:52 UTC

[GitHub] [airflow] baryluk opened a new pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

baryluk opened a new pull request #17649:
URL: https://github.com/apache/airflow/pull/17649


   In very long running airflow tasks using KubernetesPodOperator,
   especially when airflow is running in different k8s cluster,
   than where the pod is started with, we see sporadic, but reasonably
   frequent failures like this, after 5-13 hours of runtime:
   
   ```
   [2021-08-16 04:00:25,871] {pod_launcher.py:198} INFO - Event: foo-bar.xyz had an event of type Running
   [2021-08-16 04:00:25,893] {pod_launcher.py:149} INFO - 210816.0400+0000 app-specific-logs...
   ...
   ... (~few log lines ever few minutes from the app)
   ...
   [2021-08-16 17:20:29,585] {pod_launcher.py:149} INFO - 210816.1720+0000 app-specific-logs....
   [2021-08-16 17:27:36,105] {taskinstance.py:1501} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 436, in _error_catcher
       yield
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 763, in read_chunked
       self._update_chunk_length()
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 693, in _update_chunk_length
       line = self._fp.fp.readline()
     File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
       return self._sock.recv_into(b)
     File "/usr/local/lib/python3.7/ssl.py", line 1071, in recv_into
       return self.read(nbytes, buffer)
     File "/usr/local/lib/python3.7/ssl.py", line 929, in read
       return self._sslobj.read(len, buffer)
   TimeoutError: [Errno 110] Connection timed out
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
       result = task_copy.execute(context=context)
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 366, in execute
       final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 520, in create_new_pod_for_operator
       final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py", line 147, in monitor_pod
       for line in logs:
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 807, in __iter__
       for chunk in self.stream(decode_content=True):
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 571, in stream
       for line in self.read_chunked(amt, decode_content=decode_content):
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 792, in read_chunked
       self._original_response.close()
     File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
       self.gen.throw(type, value, traceback)
     File "/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 454, in _error_catcher
       raise ProtocolError("Connection broken: %r" % e, e)
   urllib3.exceptions.ProtocolError: ("Connection broken: TimeoutError(110, 'Connection timed out')", TimeoutError(110, 'Connection timed out'))
   ```
   
   Most likely because the task is not emitting a lot of logs.
   
   So, retry 4 times instead of 3 times, and even if this fails, do not fail and
   terminate the task, until the call to `self.base_container_is_running`
   function also fails or returns false.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r690623986



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -217,7 +223,7 @@ def base_container_is_running(self, pod: V1Pod):
             return False
         return status.state.running is not None
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
+    @tenacity.retry(stop=tenacity.stop_after_attempt(4), wait=tenacity.wait_exponential(), reraise=True)

Review comment:
       How'd you land on 4 attempts? Do we even need to bump this with the other change?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -143,12 +143,21 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optiona
             read_logs_since_sec = None
             last_log_time = None
             while True:
-                logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
-                    self.log.info(message)
-                    if timestamp:
-                        last_log_time = timestamp
+                try:
+                    logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+                    for line in logs:
+                        timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                        self.log.info(message)
+                        if timestamp:
+                            last_log_time = timestamp
+                except Exception as e:

Review comment:
       This is pretty broad, maybe we only look for `TimeoutError`'s?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham merged pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
jedcunningham merged pull request #17649:
URL: https://github.com/apache/airflow/pull/17649


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#issuecomment-900227052


   I see. I need to adjust tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#issuecomment-900678795


   > We also need test coverage for this change.
   
   That makes sense. Let me take a look at mocking facilities available to test this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r700002154



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -244,7 +254,9 @@ def read_pod_logs(
                 **additional_kwargs,
             )
         except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes API: {e}')
+            self.log.warning(f'There was an error reading the kubernetes API: {e}')
+            # Reraise to be catched by self.monitor_pod.
+            raise

Review comment:
       Ok about `self.log.exception`probably. It logs with level `ERROR`. Where I think warning is also ok.
   
   But what about `raise`? I think it is still needed. This is so tanacity can do raise too, and be proper exception be catched in `monitor_pod`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r690765281



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -217,7 +223,7 @@ def base_container_is_running(self, pod: V1Pod):
             return False
         return status.state.running is not None
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
+    @tenacity.retry(stop=tenacity.stop_after_attempt(4), wait=tenacity.wait_exponential(), reraise=True)

Review comment:
       I can revert that part for now. It does help a little, but with the retry that will be now attempted if needed in the outer loop (as long as the pod is alive) it indeed seems not necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r701135823



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -243,8 +244,10 @@ def read_pod_logs(
                 _preload_content=False,
                 **additional_kwargs,
             )
-        except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes API: {e}')
+        except BaseHTTPError:
+            self.log.exception(f'There was an error reading the kubernetes API.')

Review comment:
       ```suggestion
               self.log.exception('There was an error reading the kubernetes API.')
   ```
   
   This should fix the static check failure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#issuecomment-914365939


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r700002154



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -244,7 +254,9 @@ def read_pod_logs(
                 **additional_kwargs,
             )
         except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes API: {e}')
+            self.log.warning(f'There was an error reading the kubernetes API: {e}')
+            # Reraise to be catched by self.monitor_pod.
+            raise

Review comment:
       Ok about `self.log.exception`.
   
   But what about `raise`? I think it is still needed. This is so tanacity can do raise too, and be proper exception be catched in `monitor_pod`

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -244,7 +254,9 @@ def read_pod_logs(
                 **additional_kwargs,
             )
         except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes API: {e}')
+            self.log.warning(f'There was an error reading the kubernetes API: {e}')
+            # Reraise to be catched by self.monitor_pod.
+            raise

Review comment:
       Ok about `self.log.exception`probably. It logs with level `ERROR`. Where I think warning is also ok.
   
   But what about `raise`? I think it is still needed. This is so tanacity can do raise too, and be proper exception be catched in `monitor_pod`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#issuecomment-909120575






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#issuecomment-900678795


   > We also need test coverage for this change.
   
   That makes sense. Let me take a look at mocking facilities available to test this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r702005781



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -143,12 +143,22 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optiona
             read_logs_since_sec = None
             last_log_time = None
             while True:
-                logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
-                    self.log.info(message)
-                    if timestamp:
-                        last_log_time = timestamp
+                try:
+                    logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+                    for line in logs:
+                        timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                        self.log.info(message)
+                        if timestamp:
+                            last_log_time = timestamp
+                except BaseHTTPError as e:
+                    # Catches errors like ProtocolError(TimeoutError).
+                    self.log.warning(
+                        'Failed to read logs for pod %s with exception %s',
+                        pod.metadata.name,
+                        e,
+                        exc_info=True,
+                    )

Review comment:
       ```suggestion
                   except BaseHTTPError:
                       # Catches errors like ProtocolError(TimeoutError).
                       self.log.warning(
                           'Failed to read logs for pod %s with exception %s',
                           pod.metadata.name,
                           exc_info=True,
                       )
   ```
   
   I don't think you need to pass the exception like that. At least for me that seems to lead to an extra TypeError?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r700002154



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -244,7 +254,9 @@ def read_pod_logs(
                 **additional_kwargs,
             )
         except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes API: {e}')
+            self.log.warning(f'There was an error reading the kubernetes API: {e}')
+            # Reraise to be catched by self.monitor_pod.
+            raise

Review comment:
       Ok about `self.log.exception`.
   
   But what about `raise`? I think it is still needed. This is so tanacity can do raise too, and be proper exception be catched in `monitor_pod`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r699384857



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -244,7 +254,9 @@ def read_pod_logs(
                 **additional_kwargs,
             )
         except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes API: {e}')
+            self.log.warning(f'There was an error reading the kubernetes API: {e}')
+            # Reraise to be catched by self.monitor_pod.
+            raise

Review comment:
       ```suggestion
               self.log.exception('There was an error reading the kubernetes API.')
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r699384857



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -244,7 +254,9 @@ def read_pod_logs(
                 **additional_kwargs,
             )
         except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes API: {e}')
+            self.log.warning(f'There was an error reading the kubernetes API: {e}')
+            # Reraise to be catched by self.monitor_pod.
+            raise

Review comment:
       ```suggestion
               self.log.exception('There was an error reading the kubernetes API.')
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#issuecomment-909286067


   @jedcunningham Ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r690766253



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -143,12 +143,21 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optiona
             read_logs_since_sec = None
             last_log_time = None
             while True:
-                logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
-                    self.log.info(message)
-                    if timestamp:
-                        last_log_time = timestamp
+                try:
+                    logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+                    for line in logs:
+                        timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                        self.log.info(message)
+                        if timestamp:
+                            last_log_time = timestamp
+                except Exception as e:

Review comment:
       I wanted something broad, `Exception` might be a bit too much. But there might be more things beyond TimeoutError, i.e. dns issue, authorization issue, ssl errors, protocol error, etc. Basically all of this should be ignored, check if pod is still alive, and retry later.
   
   `TimeoutError` would definitively help in my use case, but I think it is a bit too narrow, and also relays on the fact kube client is using urllib3 internally, which might not be the case in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r701070916



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -244,7 +254,9 @@ def read_pod_logs(
                 **additional_kwargs,
             )
         except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes API: {e}')
+            self.log.warning(f'There was an error reading the kubernetes API: {e}')
+            # Reraise to be catched by self.monitor_pod.
+            raise

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#issuecomment-909120575






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r702881637



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -143,12 +143,22 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optiona
             read_logs_since_sec = None
             last_log_time = None
             while True:
-                logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
-                    self.log.info(message)
-                    if timestamp:
-                        last_log_time = timestamp
+                try:
+                    logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+                    for line in logs:
+                        timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                        self.log.info(message)
+                        if timestamp:
+                            last_log_time = timestamp
+                except BaseHTTPError as e:
+                    # Catches errors like ProtocolError(TimeoutError).
+                    self.log.warning(
+                        'Failed to read logs for pod %s with exception %s',
+                        pod.metadata.name,
+                        e,
+                        exc_info=True,
+                    )

Review comment:
       Please take another look now. The helm chart test failure, looks to be unrelated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r690765281



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -217,7 +223,7 @@ def base_container_is_running(self, pod: V1Pod):
             return False
         return status.state.running is not None
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
+    @tenacity.retry(stop=tenacity.stop_after_attempt(4), wait=tenacity.wait_exponential(), reraise=True)

Review comment:
       I can revert that part for now. It does help a little, but with the retry that will be now attempted if needed in the outer loop (as long as the pod is alive) it indeed seems not necessary.

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -143,12 +143,21 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optiona
             read_logs_since_sec = None
             last_log_time = None
             while True:
-                logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
-                    self.log.info(message)
-                    if timestamp:
-                        last_log_time = timestamp
+                try:
+                    logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+                    for line in logs:
+                        timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                        self.log.info(message)
+                        if timestamp:
+                            last_log_time = timestamp
+                except Exception as e:

Review comment:
       I wanted something broad, `Exception` might be a bit too much. But there might be more things beyond TimeoutError, i.e. dns issue, authorization issue, ssl errors, protocol error, etc. Basically all of this should be ignored, check if pod is still alive, and retry later.
   
   `TimeoutError` would definitively help in my use case, but I think it is a bit too narrow, and also relays on the fact kube client is using urllib3 internally, which might not be the case in the future.

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -143,12 +143,21 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optiona
             read_logs_since_sec = None
             last_log_time = None
             while True:
-                logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
-                    self.log.info(message)
-                    if timestamp:
-                        last_log_time = timestamp
+                try:
+                    logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+                    for line in logs:
+                        timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                        self.log.info(message)
+                        if timestamp:
+                            last_log_time = timestamp
+                except Exception as e:

Review comment:
       Ok. Used `urllib3.exceptions.HTTPError` which is a base for many socket related errors in `urllib3` including connection, protocol, timeouts, response parsing errors, etc.
   
   Should be good.

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -217,7 +223,7 @@ def base_container_is_running(self, pod: V1Pod):
             return False
         return status.state.running is not None
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
+    @tenacity.retry(stop=tenacity.stop_after_attempt(4), wait=tenacity.wait_exponential(), reraise=True)

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#issuecomment-909120575


   Back from vacation. Working on it now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r700041759



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -244,7 +254,9 @@ def read_pod_logs(
                 **additional_kwargs,
             )
         except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes API: {e}')
+            self.log.warning(f'There was an error reading the kubernetes API: {e}')
+            # Reraise to be catched by self.monitor_pod.
+            raise

Review comment:
       Yep. With tenacity we want to rety, I think removing raise was accidental suggestion. 
   The `log.exception` is better as it provides more information about the exception raised (and it's more explicit way of handling logs in exception).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baryluk commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
baryluk commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r690771413



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -143,12 +143,21 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optiona
             read_logs_since_sec = None
             last_log_time = None
             while True:
-                logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
-                    self.log.info(message)
-                    if timestamp:
-                        last_log_time = timestamp
+                try:
+                    logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+                    for line in logs:
+                        timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                        self.log.info(message)
+                        if timestamp:
+                            last_log_time = timestamp
+                except Exception as e:

Review comment:
       Ok. Used `urllib3.exceptions.HTTPError` which is a base for many socket related errors in `urllib3` including connection, protocol, timeouts, response parsing errors, etc.
   
   Should be good.

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -217,7 +223,7 @@ def base_container_is_running(self, pod: V1Pod):
             return False
         return status.state.running is not None
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
+    @tenacity.retry(stop=tenacity.stop_after_attempt(4), wait=tenacity.wait_exponential(), reraise=True)

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#discussion_r690623986



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -217,7 +223,7 @@ def base_container_is_running(self, pod: V1Pod):
             return False
         return status.state.running is not None
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
+    @tenacity.retry(stop=tenacity.stop_after_attempt(4), wait=tenacity.wait_exponential(), reraise=True)

Review comment:
       How'd you land on 4 attempts? Do we even need to bump this with the other change?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -143,12 +143,21 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optiona
             read_logs_since_sec = None
             last_log_time = None
             while True:
-                logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
-                    self.log.info(message)
-                    if timestamp:
-                        last_log_time = timestamp
+                try:
+                    logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+                    for line in logs:
+                        timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                        self.log.info(message)
+                        if timestamp:
+                            last_log_time = timestamp
+                except Exception as e:

Review comment:
       This is pretty broad, maybe we only look for `TimeoutError`'s?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #17649: Do not fail KubernetesPodOperator tasks if log following fails

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #17649:
URL: https://github.com/apache/airflow/pull/17649#issuecomment-914369681


   Thanks @baryluk!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org