You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (JIRA)" <ji...@apache.org> on 2019/06/19 08:36:03 UTC

[jira] [Updated] (AIRFLOW-4401) multiprocessing.Queue.empty() is unreliable

     [ https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ash Berlin-Taylor updated AIRFLOW-4401:
---------------------------------------
    Fix Version/s:     (was: 2.0.0)

> multiprocessing.Queue.empty() is unreliable
> -------------------------------------------
>
>                 Key: AIRFLOW-4401
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Jarek Potiuk
>            Priority: Major
>             Fix For: 1.10.4
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of LocalExecutor tests (documented for example in AIRFLOW-4382), I took a deeper dive into the problem and what I found raised the remaining hair on top of my head. 
> We had a number of flaky tests in the local executor that resulted in result_queue not being empty where it should have been emptied a moment before. More details and discussion can be found in [https://github.com/apache/airflow/pull/5159] 
> The problem turned out to be  unreliability of multiprocessing.Queue empty() implementation. It turned out that multiprocessing.Queue.empty() implementation is not fully synchronized and it might return True even if put() operation has been already completed in another process. What's more - empty() might return True even if qsize() of the queue returns > 0 (!) 
> It's a bit mind-boggling but it is "as intended' as documented in [https://bugs.python.org/issue23582]  (resolved as "not a bug" !!!!) and it is described in [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] when you go details of how data is synchronized between processes.
> A few people have stumbled upon this problem. For example [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f] and [https://github.com/keras-team/autokeras/issues/368] 
> Also we seemed to experienced that in Airflow before. In jobs.py years ago (31.07.2016) - we can see the comment below (but we used multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582]  using qsize() was working on Linux but is not really acceptable because qsize() does not work on MacOS (throws NotImplementedError).
>  
> *Proposed solution 1) Synchronized Queue*
> [https://github.com/apache/airflow/pull/5199]
> Implement a more reliable queue (SynchronizedQueue) based on [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f] (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since we want to backport to stable v1.10 release).
> We should replace all usages of multiprocessing.Queue where empty() is used with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue in similar way in the future.
> Pros:
>  * rather straightforward replacement of queue -> SynchronizedQueue
>  * no extra processes needed - queues continue to be distributed without central manager
>  * no need to cleanup the processes
> Cons:
>  * potential synchronization delays (likely negligible)
>  * we are adding our own SynchronizedQueue with slightly altered behaviour - more code to manage
>  * the SynchronizedQueue implementation is still not fully reliable - you can have cases where empty() returns False but get_no_wait() raises Empty exception This means that everywhere we depend on non empty() we have to use potentially blocking get() to retrieve data
>  * Requires (but simple) backporting to python 2 for v1-10 branch
> *Proposed solution 2): Use managed queues*
> [https://github.com/apache/airflow/pull/5200]
> Seems that this unreliable behaviour of Queue is only happening if the Queue is instantiated directly and the small delays between processes are gone when Shared Manager is used. In such case Queue is really a proxy to a central Queue object started in a separate process - thus synchronisation is implemented fully via this single central queue: [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . Using Managed queues should solve the problem.
> Observation from tests confirms that this is the case and the tests are not flaky any more when managed queues are used.
> Pros:
>  * Only initialisation of queues needs to be changed - no need to extend Queue implementation
>  * Pythonic way - managers are part of standard library and we can assume they are reliable and tested
>  * Such managed queue is fully reliable - empty() and get_no_wait() are perfectly in sync.
>  * Works the same for python 2/python 3
> Cons:
>  * potential synchronization delays (likely negligible)
>  * since we have a separate process started for each manager, cleanup is necessary and it is quite delicate, because shutting down the manager prevents from accessing the queue (Broken Pipe errors). Therefore sequence of cleanup is important - to first process everything and clean-up later. This might have some undesirable side effects when shutting down Schedulers/Workers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)