You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/21 22:41:50 UTC
[airflow] branch master updated: Fix QueuedLocalWorker crashing
with EOFError (#13215)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 484f95f Fix QueuedLocalWorker crashing with EOFError (#13215)
484f95f is described below
commit 484f95f55cda4ca4fd3157135199623c9e37cc8a
Author: Rik Heijdens <Ri...@users.noreply.github.com>
AuthorDate: Mon Dec 21 23:40:23 2020 +0100
Fix QueuedLocalWorker crashing with EOFError (#13215)
LocalExecutor uses a multiprocessing.Queue to distribute tasks to the
instances of QueuedLocalWorker. If for some reason LocalExecutor exits
(e.g. because it encountered an unhandled exception), then each of the
QueuedLocalWorker instances that it manages will also exit while trying
to read from the task queue.
This obfuscates the root cause of the issue, i.e. that the LocalExecutor
terminated. By catching EOFError, logging an error and exiting gracefully
we circumvent this issue.
---
airflow/executors/local_executor.py | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 7fcba8a..e8049af 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -173,7 +173,15 @@ class QueuedLocalWorker(LocalWorkerBase):
def do_work(self) -> None:
while True:
- key, command = self.task_queue.get()
+ try:
+ key, command = self.task_queue.get()
+ except EOFError:
+ self.log.info(
+ "Failed to read tasks from the task queue because the other "
+ "end has closed the connection. Terminating worker %s.",
+ self.name,
+ )
+ break
try:
if key is None or command is None:
# Received poison pill, no more tasks to run