You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Jarek Potiuk (JIRA)" <ji...@apache.org> on 2019/04/24 04:15:00 UTC

[jira] [Created] (AIRFLOW-4401) multiprocessing.Queue is unreliable

Jarek Potiuk created AIRFLOW-4401:
-------------------------------------

             Summary: multiprocessing.Queue is unreliable
                 Key: AIRFLOW-4401
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
             Project: Apache Airflow
          Issue Type: Bug
            Reporter: Jarek Potiuk


After discussing with [~ash] and [~BasPH] potential reasons for flakiness of LocalExecutor tests, I took a deeper dive into the problem and what I found raised the remaining hair on top of my head. 

We had a number of flaky tests in the local executor that resulted in result_queue not being empty where it should have been emptied a moment before. More details and discussion can be found in [https://github.com/apache/airflow/pull/5159] 

The problem turned out to be ... unreliability of multiprocessing.Queue empty() implementation. Itt turned out that multiprocessing.Queue.empty() implementation is not fully synchronized and it might return True even if put() operation has been already completed in another thread. What's more - empty() might return True even if qsize() of the queue returns > 0 (!) 

It's a bit mind-boggling but it is "as intended' as documented in [https://bugs.python.org/issue23582]  (resolved as "not a bug" !!!!) 

A few people have stumbled upon this problem. For example [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f] and [https://github.com/keras-team/autokeras/issues/368] 

The solution available in [https://bugs.python.org/issue23582]  using qsize() was not usable because qsize() does not work on MacOS (throws NotImplementedError).

The working solution is to implement a reliable queue (SynchronizedQueue) based on [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f] (butwith a twist that __init__ of class deriving from Queue has to be changed for python 3.4+ as described in [https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].

Luckily we are now Python3.5+

We should replace all usages of multiprocessing.Queue where empty() is used with the SynchronizedQueue

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)