You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/21 22:41:50 UTC

[airflow] branch master updated: Fix QueuedLocalWorker crashing with EOFError (#13215)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 484f95f  Fix QueuedLocalWorker crashing with EOFError (#13215)
484f95f is described below

commit 484f95f55cda4ca4fd3157135199623c9e37cc8a
Author: Rik Heijdens <Ri...@users.noreply.github.com>
AuthorDate: Mon Dec 21 23:40:23 2020 +0100

    Fix QueuedLocalWorker crashing with EOFError (#13215)
    
    LocalExecutor uses a multiprocessing.Queue to distribute tasks to the
    instances of QueuedLocalWorker. If for some reason LocalExecutor exits
    (e.g. because it encountered an unhandled exception), then each of the
    QueuedLocalWorker instances that it manages will also exit while trying
    to read from the task queue.
    
    This obfuscates the root cause of the issue, i.e. that the LocalExecutor
    terminated. By catching EOFError, logging an error and exiting gracefully
    we circumvent this issue.
---
 airflow/executors/local_executor.py | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 7fcba8a..e8049af 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -173,7 +173,15 @@ class QueuedLocalWorker(LocalWorkerBase):
 
     def do_work(self) -> None:
         while True:
-            key, command = self.task_queue.get()
+            try:
+                key, command = self.task_queue.get()
+            except EOFError:
+                self.log.info(
+                    "Failed to read tasks from the task queue because the other "
+                    "end has closed the connection. Terminating worker %s.",
+                    self.name,
+                )
+                break
             try:
                 if key is None or command is None:
                     # Received poison pill, no more tasks to run