You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/12 00:06:18 UTC

[airflow] branch v2-5-test updated: Handle ConnectionReset exception in Executor cleanup (#28685)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-5-test by this push:
     new 4cef77be70 Handle ConnectionReset exception in Executor cleanup (#28685)
4cef77be70 is described below

commit 4cef77be70ce96eb760d4f64425bbdb0cc8e7544
Author: Max Ho <ma...@gmail.com>
AuthorDate: Tue Jan 3 19:53:52 2023 +0800

    Handle ConnectionReset exception in Executor cleanup (#28685)
    
    (cherry picked from commit a3de721e2f084913e853aff39d04adc00f0b82ea)
---
 airflow/executors/kubernetes_executor.py | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 16cf1b282f..65e463a948 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -843,13 +843,16 @@ class KubernetesExecutor(BaseExecutor):
             assert self.kube_scheduler
 
         self.log.info("Shutting down Kubernetes executor")
-        self.log.debug("Flushing task_queue...")
-        self._flush_task_queue()
-        self.log.debug("Flushing result_queue...")
-        self._flush_result_queue()
-        # Both queues should be empty...
-        self.task_queue.join()
-        self.result_queue.join()
+        try:
+            self.log.debug("Flushing task_queue...")
+            self._flush_task_queue()
+            self.log.debug("Flushing result_queue...")
+            self._flush_result_queue()
+            # Both queues should be empty...
+            self.task_queue.join()
+            self.result_queue.join()
+        except ConnectionResetError:
+            self.log.exception("Connection Reset error while flushing task_queue and result_queue.")
         if self.kube_scheduler:
             self.kube_scheduler.terminate()
         self._manager.shutdown()