You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/04/30 05:15:14 UTC

[GitHub] [airflow] potiuk edited a comment on issue #5200: [AIRFLOW-4401] Use managers for Queue synchronization

potiuk edited a comment on issue #5200: [AIRFLOW-4401] Use managers for Queue synchronization
URL: https://github.com/apache/airflow/pull/5200#issuecomment-487823276
 
 
   @BasPH @Fokko @ashb -> I made some amendments now as I realised that Manager().Queue() is always a JoinableQueue (unlike multiprocessing.Queue). I updated the pattern everywhere to follow this general pattern below (also described it in the commit message). 
   
   Since the managed queue is anyhow proxied to the shared one, get_nowait() will always return the  message if it already has been put to the queue by another process.
   
   BTW. the get_nowait() message in case of managed queue is not really  "instant" as in case of standard multiprocessing.Queue - it will poll the shared process, but that's exactly why it is good for us :).
   
   It also seems that in KubernetesExecutor there was a mistake. See the comment here https://github.com/apache/airflow/pull/5200/commits/6ae1ac6f25b53e8a726b463461c96b50e114d7e7#r279615263 - if we've heard about inability to gracefully shutdown the KubernetesExecutor - that could be it.
   
   ```
   self.task_queue.join()
   ```
   
   
   ```
   while True:
      try:
          res = queue.get_nowait()
          try:
             .... do some processing
          finally:
              queue.task_done()
      except Empty:
          break
   
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services