You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/15 10:14:37 UTC

[GitHub] [airflow] romainfage commented on issue #19329: Kubernetes Executor: non-existent upstream failed

romainfage commented on issue #19329:
URL: https://github.com/apache/airflow/issues/19329#issuecomment-968737040


   Hello all!
   
   When looking at the issue and logs, We may have understood why the task is set to `upstream_failed` whereas the task is retrying. (we are using Airflow 2.1.3)
   
   We are getting this logs inside our scheduler:
   
   ```bash
   {kubernetes_executor.py:368} INFO - Attempting to finish pod; pod_id: POD_ID_1; state: failed; annotations: {'dag_id': 'DAG1', 'task_id': 'TASK_ID', 'execution_date': '2021-10-29T13:05:00+00:00', 'try_number': '1'}
   {kubernetes_executor.py:546} INFO - Changing state of (TaskInstanceKey(dag_id='DAG1', task_id='TASK_ID', execution_date=datetime.datetime(2021, 10, 29, 13, 5, tzinfo=tzlocal()), try_number=1), <TaskInstanceState.FAILED: 'failed'>, 'POD_ID_1', 'airflow', '164023816') to failed
   {scheduler_job.py:611} INFO - Executor reports execution of DAG1.TASK_ID execution_date=2021-10-29 13:05:00+00:00 exited with status failed for try_number 1
   {scheduler_job.py:647} ERROR - Executor reports task instance <TaskInstance: DAG1.TASK_ID 2021-10-29 13:05:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
   {scheduler_job.py:654} INFO - Setting task instance <TaskInstance: DAG1.TASK_ID 2021-10-29 13:05:00+00:00 [queued]> state to failed as reported by executor
   ```
   
   Our understanding is (as demonstrate @sweco), that K8S executors try to finished the pod that is not launched yet (maybe a timeout of any kind, I cannot currently pin point where and why the executors do that). That is why we see this log:
   
   ```bash
   {kubernetes_executor.py:368} INFO - Attempting to finish pod; pod_id: POD_ID_1; state: failed; annotations: {'dag_id': 'DAG1', 'task_id': 'TASK_ID', 'execution_date': '2021-10-29T13:05:00+00:00', 'try_number': '1'}
   {kubernetes_executor.py:546} INFO - Changing state of (TaskInstanceKey(dag_id='DAG1', task_id='TASK_ID', execution_date=datetime.datetime(2021, 10, 29, 13, 5, tzinfo=tzlocal()), try_number=1), <TaskInstanceState.FAILED: 'failed'>, 'POD_ID_1', 'airflow', '164023816') to failed
   ```
   
   Afterward, the scheduler received an event from the scheduler that change the state of the dag to failed. We can see the reception of the event here:
   
   ```bash
   {scheduler_job.py:611} INFO - Executor reports execution of DAG1.TASK_ID execution_date=2021-10-29 13:05:00+00:00 exited with status failed for try_number 1
   ````
   
   Afterward, the scheduler try to handle the event, to do so it look at the current state inside the metadata db of the task. However, since the pod did not start yet, the state of the task is still `queued`. The scheduler think because the old state is `queued` and the new state is `failed`, it is not a classic behavior so someone external should have done something. That is why we see the log:
   
   ```bash
   {scheduler_job.py:647} ERROR - Executor reports task instance <TaskInstance: DAG1.TASK_ID 2021-10-29 13:05:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
   ```
   We can notice the question from the scheduler `Was the task killed externally?`, meaning that it is did not sure to handle well the case.
   
   When looking at the code, we can pinpoint [here](https://github.com/apache/airflow/blob/2.1.3/airflow/jobs/scheduler_job.py#L641-L656).  Preview:
   ```python
       @provide_session
       def _process_executor_events(self, session: Session = None) -> int:
               [...]
               if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED:
                   Stats.incr('scheduler.tasks.killed_externally')
                   msg = (
                       "Executor reports task instance %s finished (%s) although the "
                       "task says its %s. (Info: %s) Was the task killed externally?"
                   )
                   self.log.error(msg, ti, state, ti.state, info)
   
                   request = TaskCallbackRequest(
                       full_filepath=ti.dag_model.fileloc,
                       simple_task_instance=SimpleTaskInstance(ti),
                       msg=msg % (ti, state, ti.state, info),
                   )
                   self.log.info('Setting task instance %s state to %s as reported by executor', ti, state)
                   ti.set_state(state)
                   self.processor_agent.send_callback_to_execute(request)
   ```
   
   By consequence, as the scheduler think something external marks the task as `failed` and the scheduler marks downstream tasks as `upstream_failed`. Then, afterward the task is retried, finished in success and is green in Airflow but no downstream tasks are launched since they are in state `upstream_failed`.
   
   I don't have currently enough insight on how the Airflow scheduler code really work, so I cannot provide a proper fix to this problem, but to fix the problem, may we do:
   
   * Provide another failure state to make difference between external task kill and airflow task kill?
   * Provide more information in the event received by the scheduler from the executor on the nature of the kill?
   * When the task retries, it clear the down stream task state, therefore they will not be in `upstream_failed` state?
   
   Thank you for reading me
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org