You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/12 00:06:18 UTC
[airflow] branch v2-5-test updated: Handle ConnectionReset exception in Executor cleanup (#28685)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-5-test by this push:
new 4cef77be70 Handle ConnectionReset exception in Executor cleanup (#28685)
4cef77be70 is described below
commit 4cef77be70ce96eb760d4f64425bbdb0cc8e7544
Author: Max Ho <ma...@gmail.com>
AuthorDate: Tue Jan 3 19:53:52 2023 +0800
Handle ConnectionReset exception in Executor cleanup (#28685)
(cherry picked from commit a3de721e2f084913e853aff39d04adc00f0b82ea)
---
airflow/executors/kubernetes_executor.py | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 16cf1b282f..65e463a948 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -843,13 +843,16 @@ class KubernetesExecutor(BaseExecutor):
assert self.kube_scheduler
self.log.info("Shutting down Kubernetes executor")
- self.log.debug("Flushing task_queue...")
- self._flush_task_queue()
- self.log.debug("Flushing result_queue...")
- self._flush_result_queue()
- # Both queues should be empty...
- self.task_queue.join()
- self.result_queue.join()
+ try:
+ self.log.debug("Flushing task_queue...")
+ self._flush_task_queue()
+ self.log.debug("Flushing result_queue...")
+ self._flush_result_queue()
+ # Both queues should be empty...
+ self.task_queue.join()
+ self.result_queue.join()
+ except ConnectionResetError:
+ self.log.exception("Connection Reset error while flushing task_queue and result_queue.")
if self.kube_scheduler:
self.kube_scheduler.terminate()
self._manager.shutdown()