You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/04/23 15:32:36 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #5159: [AIRFLOW-XXX] Attempt to remove flakeyness of LocalExecutor

potiuk commented on a change in pull request #5159: [AIRFLOW-XXX] Attempt to remove flakeyness of LocalExecutor
URL: https://github.com/apache/airflow/pull/5159#discussion_r277740312
 
 

 ##########
 File path: tests/executors/test_local_executor.py
 ##########
 @@ -49,11 +48,7 @@ def execution_parallelism(self, parallelism=0):
 
         executor.running['fail'] = True
 
-        if parallelism == 0:
 
 Review comment:
   Somehow I doubt this will fix it. Every time I run the tests locally, It completes sub-second. 
   
   I think the error we are observing is a real race.
   
   I think I know what could have caused it (though it's a wild guess). There are a number of - looks like - workarounds for a weird behaviour that should not have happened in reality in the code around this LocalExecutor and Paralellism and it looks fishy.
   
   Looks like someone experienced that race before and implemented the workarounds without solving root cause. I am talking about those lines:
   
   ```
       # errors are propagated for some reason
           try:
               executor.execute_async(key='fail', command=fail_command)
           except Exception:
               pass
   ```
   And in LocalExcutor I see: 
   
   ```
               key, command = self.task_queue.get()
               if key is None:
                   # Received poison pill, no more tasks to run
                   self.task_queue.task_done()
                   break
   ```
   
   So it looks like someone encountered None when running get() from queue (which could only happen if someone actually put None into the queue (for get method Exceptions are returned when get timeouts). 
   
   I looked a bit closer and I think the problem is sequence of operations in the test itself (not the LocalExecutor nor BaseExecutor implementation). When you look at the actual implementation of BaseExecutor's heartbeat() method, you can see that the sequence of operations there is different than in the test.
   
   In Base Executor you have:
   
   ```
               self.running[key] = command
               self.execute_async(key=key,
                                  command=command,
   ```
   
   but in our test we have:
   
   ```
               executor.execute_async(key=key, command=command)
               executor.running[key] = True
   ```
   
   My intuition told me that this reversal might trigger all the problems. After digging deeper, it looks like if a "change_state()" method from base_executor runs very, fast it might actually run 'self.running.pop(key, None)'  BEFORE running[key] = True is executed. Then the "pop" method is not called again and the "running[key]" remains in the dictionary.
   
   I think that's the root cause of the problem. And maybe we can also remove those two nasty workarounds if they are triggered by this as well (I did not dig even deeper, but it looks like they might be related).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services