You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/03 16:42:10 UTC

[GitHub] [airflow] gdevanla opened a new issue #8691: Task Instance will not be queued if `celery_executor` does not drop queue tasks on error.

gdevanla opened a new issue #8691:
URL: https://github.com/apache/airflow/issues/8691


   **Apache Airflow version**:
   1.10.3 
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   **Environment**:
   Python 3.7.4
   
   - **Cloud provider or hardware configuration**:
   - **OS** (e.g. from /etc/os-release):
   Ubuntu xenial/bionic
   - **Kernel** (e.g. `uname -a`): 
   Linux 4.15.0-45-generic #48~16.04.1-Ubuntu SMP Tue Jan 29 18:03:48 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
   
   - **Install tools**:
   - **Others**:
   
   **What happened**:
   
   The task_instance gets stuck in `scheduled` state because of inconsistency in expectations of how queued_tasks (that have failed to be queued successfully in CeleryExecutor) are to be handled. 
   
   Given a TaskInstance, `TI`, whose state is `None`, the following psuedo code is executed inside the 'scheduler_loop'. In this process, the `TI`, in some situations gets stuck in `scheduled` state
   
   (The indentations below depicts the call-stack)
   ```
   Given, a task_instance `TI`, in `state == None`,
   
   execute_helper (scheduler loop)
       (first iteration of the scheduler loop)
        - calls `_execute_task_instances` 
           - calls `_find_executable_task_instances()` that returns `TI` that has state == `None`
           - calls  `_change_state_for_executable_task_instances` that updates `TI`s state = `queued`
           - calls `_enqueue_task_instances_with_queued_state`. This function adds `TI` to `Executor.queued_tasks` dictionary.
       - calls `CeleryExecutor.heartbeat`
           Tries to `send_task to worker`. If this succeeds, the `TI` is popped from `CeleryExecutor.queued_tasks`. But in our scenario, `CeleryExecutor`, just leaves the entry in`queued_tasks` intact beause either `Exception` was raised or `result` was `None`. The `CeleryExecutor` assumes the scheduler will handle this scenario. This is where the problem starts.(see second iteration below) (The link to this code is provided below)
       - calls `_change_state_for_tasks_failed_to_execute`.
            This function notices that the `TI` entry in `CeleryExecutor.queued_tasks`, and assumes something went wrong and therefore correctly updates status of `TI` back to `scheduled`. Note, that the entry of `TI` still is in the `queued_tasks` and that causes the current issue (see second iteration below)
   
       - other maintenance activities happen in the scheduler loop  (not relevant to this issue)
   
       (second iteration of the scheduler loop)
       - calls `_execute_task_instances`
           - calls `_find_executable_task_instances()`.
               Now, this function is supposed to return `TI` since it is in `scheduled` state. But, it finds that an entry for `TI` already exists in `CeleryExecutor.queued_tasks` and therefore does not return `TI` (refer to link provided below which point to this case).  This means `TI` will never  be `queued` and is stuck in `scheduled` state. (https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033)
   ```
   The only workaround for this currently, is to restart the scheduler. When the scheduler is restarted, the `CeleryExecutor.queued_tasks` is reset and therefore the `TI` instance is `queued` again.
   
   The code where `queue_tasks` entry is updated by poping the TI is here:
   https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/executors/celery_executor.py#L223
   
   The code due to which `TI` gets stuck in `scheduled` state is here:
   https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033
   
   I think the code here should only check if `CeleryExecutor.running' dictionary has `TI` in its entries. But, I am not sure how it affects other schedulers.
   
   **What you expected to happen**:
   
   The `_find_executable_task_instances()` function, should only check if `CeleryExecutor.running` contains an entry for `TI` and return `TI` as part of its list of tasks to be queued.
   
   **How to reproduce it**:
   
   It can be reproduced by forcing the `result` value in `CeleryExecutor.heartbeat` to return an `ExceptionTraceback' object or `None`. 
   
   (Note: Links point to `master` branch. But, the problem applies to 1.10.3 and higher versions)
   
   **Anything else we need to know**:
   
   I am not able to see a scenario where in `CeleryExecutor.heartbeart` the `result` is `None`.  Since, looking at the `Celery.app` module, it feels like the `result` can never be done. But, I suspect there are scenario's where the `result` is None and therefore the `CeleryExecutor` does not pop the `TI` from the queue. I am not able to prove this concretely.
   
   This also happens with later version's of Airflow. In the later version's of airflow, the `CeleryExecutor.trigger_dags' functions is performing the same set of operations. The code has been moved around between different versions but the logic remains the same and the problem exists in later versions as well.
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed issue #8691: Task Instance will not be queued if `celery_executor` does not drop queued tasks on error.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed issue #8691:
URL: https://github.com/apache/airflow/issues/8691


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #8691: Task Instance will not be queued if `celery_executor` does not drop queued tasks on error.

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #8691:
URL: https://github.com/apache/airflow/issues/8691#issuecomment-832783858


   @gdevanla is this problem still occur on Airflow 2?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] gdevanla commented on issue #8691: Task Instance will not be queued if `celery_executor` does not drop queued tasks on error.

Posted by GitBox <gi...@apache.org>.
gdevanla commented on issue #8691:
URL: https://github.com/apache/airflow/issues/8691#issuecomment-832846750


   I haven't tested with the latest release yet.  I plan to do it next week. Will report back after that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #8691: Task Instance will not be queued if `celery_executor` does not drop queue tasks on error.

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #8691:
URL: https://github.com/apache/airflow/issues/8691#issuecomment-623140181


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on issue #8691: Task Instance will not be queued if `celery_executor` does not drop queued tasks on error.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #8691:
URL: https://github.com/apache/airflow/issues/8691#issuecomment-869246040


   This issue has been closed because it has not received response from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on issue #8691: Task Instance will not be queued if `celery_executor` does not drop queued tasks on error.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #8691:
URL: https://github.com/apache/airflow/issues/8691#issuecomment-864478662


   This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org