You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/04/28 13:50:07 UTC

[GitHub] [airflow] potiuk opened a new pull request #5199: [AIRFLOW-4401] SynchronizedQueue used where empty() is used.

potiuk opened a new pull request #5199: [AIRFLOW-4401] SynchronizedQueue used where empty() is used.
URL: https://github.com/apache/airflow/pull/5199
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
     - https://issues.apache.org/jira/browse/AIRFLOW-4401
     - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.
     - In case you are proposing a fundamental code change, you need to create an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)).
     - In case you are adding a dependency, check if the license complies with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI changes:
   
   It is a known problem https://bugs.python.org/issue23582 that
   multiprocessing.Queue empty() method is not reliable - sometimes it might
   return True even if another process already put something in the queue.
   
   This resulted in some of the tasks not picked up when sync() methods
   were called (in AirflowKubernetesScheduler, LocalExecutor,
   DagFileProcessor). This was less of a problem if the method was called in sync()
   - as the remaining jobs/files could be processed in next pass but it was a problem
   in tests and when graceful shutdown was executed (some tasks could be still
   unprocessed while the shutdown occured).
   
   All the cases impacted follow the same pattern now:
   
   while not queue.empty():
      res = queue.get()
      ....
   
   This loop runs always in single (main) process so it is safe to run it this way -
   there is no risk that some other process will retrieve the data from the queue in
   between empty() and get().
   
   Note that unlike in the standard multiprocessing.Queue, you cannot rely
   on data being immediately available after empty() is False. You should be
   prepared that subsequent get_nowait() raises Empty, or (better) use get()
   to retrieve the data.
   
   In all these cases overhead for inter-processing locking is negligible
   comparing to the action executed (Parsing DAG, executing job)
   so it appears it should be safe to merge the change.
   
   ### Tests
   
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   No need. Lots of tests for that already (flaky ones).
   
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [x] In case of new functionality, my PR adds documentation that describes how to use it.
     - All the public functions and the classes in the PR contain docstrings that explain what it does
     - If you implement backwards incompatible changes, please leave a note in the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so we can assign it to a appropriate release
   
   ### Code Quality
   
   - [x] Passes `flake8`
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services