You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/02 08:29:13 UTC

[GitHub] [airflow] rafalkozik commented on issue #10790: Copy of [AIRFLOW-5071] JIRA: Thousands of Executor reports task instance X finished (success) although the task says its queued. Was the task killed externally?

rafalkozik commented on issue #10790:
URL: https://github.com/apache/airflow/issues/10790#issuecomment-702598519


   We have just introduced _ExternalTaskSensor_ into our pipeline and faced the same issue. When initially tested on our dev instance (~200 DAGs) it worked fine, after running it on our prod environment (~400 DAGs) it was always failing after reschedule.
   
   After digging into the code, it looks that this is simply race condition in the scheduler.
   
   We have _child_dag.parent_dag_completed_ task that waits for business process to complete calculations in _parent_dag_, task execution logs:
   
   ```
   [2020-10-01 11:48:03,038] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 [queued]>
   [2020-10-01 11:48:03,065] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 [queued]>
   [2020-10-01 11:48:03,066] {taskinstance.py:879} INFO - 
   --------------------------------------------------------------------------------
   [2020-10-01 11:48:03,066] {taskinstance.py:880} INFO - Starting attempt 1 of 1
   [2020-10-01 11:48:03,066] {taskinstance.py:881} INFO - 
   --------------------------------------------------------------------------------
   [2020-10-01 11:48:03,095] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): parent_dag_completed> on 2020-09-30T11:45:00+00:00
   [2020-10-01 11:48:03,100] {standard_task_runner.py:53} INFO - Started process 26131 to run task
   [2020-10-01 11:48:03,235] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 [running]> ip-10-200-100-113.eu-west-1.compute.internal
   [2020-10-01 11:48:03,318] {external_task_sensor.py:117} INFO - Poking for parent_dag on 2020-09-30T11:45:00+00:00 ... 
   [2020-10-01 11:48:03,397] {taskinstance.py:1136} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
   [2020-10-01 11:48:12,994] {logging_mixin.py:112} INFO - [2020-10-01 11:48:12,993] {local_task_job.py:103} INFO - Task exited with return code 0
   [2020-10-01 11:50:53,744] {taskinstance.py:663} INFO - Dependencies not met for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run.
   [2020-10-01 11:50:53,747] {logging_mixin.py:112} INFO - [2020-10-01 11:50:53,747] {local_task_job.py:91} INFO - Task is not able to be run
   ```
   
   Scheduler logs:
   
   ```
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [scheduled]>
   [2020-10-01 11:47:59,428] {scheduler_job.py:1010} INFO - DAG child_dag has 0/16 running and queued tasks
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [scheduled]>
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [queued]>
   [2020-10-01 11:47:59,565] {scheduler_job.py:1170} INFO - Sending ('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
   [2020-10-01 11:47:59,565] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'child_dag', 'parent_dag_completed', '2020-09-30T11:45:00+00:00', '--local', '--pool', 'default_pool', '-sd', '/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [scheduled]>
   [2020-10-01 11:50:50,118] {scheduler_job.py:1010} INFO - DAG child_dag has 0/16 running and queued tasks
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [scheduled]>
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [queued]>
   [2020-10-01 11:50:50,148] {scheduler_job.py:1170} INFO - Sending ('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
   [2020-10-01 11:50:50,148] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'child_dag', 'parent_dag_completed', '2020-09-30T11:45:00+00:00', '--local', '--pool', 'default_pool', '-sd', '/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
   [2020-10-01 11:50:50,595] {scheduler_job.py:1313} INFO - Executor reports execution of child_dag.parent_dag_completed execution_date=2020-09-30 11:45:00+00:00 exited with status success for try_number 1
   [2020-10-01 11:50:50,599] {scheduler_job.py:1330} ERROR - Executor reports task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [queued]> finished (success) although the task says its queued. Was the task killed externally?
   [2020-10-01 11:50:50,803] {taskinstance.py:1145} ERROR - Executor reports task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 [queued]> finished (success) although the task says its queued. Was the task killed externally?
   [2020-10-01 11:50:50,804] {taskinstance.py:1202} INFO - Marking task as FAILED.dag_id=child_dag, task_id=parent_dag_completed, execution_date=20200930T114500, start_date=20201001T114803, end_date=20201001T115050
   ```
   
   From scheduler log it's visible that event from executor is processed after task is already queued for the second time.
   
   Logic related to those logs is here:
   
   ```python
       def _validate_and_run_task_instances(self, simple_dag_bag):
           if len(simple_dag_bag.simple_dags) > 0:
               try:
                   self._process_and_execute_tasks(simple_dag_bag) # <-- task state is changed to queued here
               except Exception as e:
                   self.log.error("Error queuing tasks")
                   self.log.exception(e)
                   return False
   
           # Call heartbeats
           self.log.debug("Heartbeating the executor")
           self.executor.heartbeat()
   
           self._change_state_for_tasks_failed_to_execute()
   
           # Process events from the executor
           self._process_executor_events(simple_dag_bag) # <-- notification of previous execution is processed and there is state mismatch 
           return True
   ```
           
   This is the place where task state is changes:
   
   ```python
       def _process_executor_events(self, simple_dag_bag, session=None):
          
          # ...
          
                   if ti.try_number == try_number and ti.state == State.QUEUED:
                       msg = ("Executor reports task instance {} finished ({}) "
                              "although the task says its {}. Was the task "
                              "killed externally?".format(ti, state, ti.state))
                       Stats.incr('scheduler.tasks.killed_externally')
                       self.log.error(msg)
                       try:
                           simple_dag = simple_dag_bag.get_dag(dag_id)
                           dagbag = models.DagBag(simple_dag.full_filepath)
                           dag = dagbag.get_dag(dag_id)
                           ti.task = dag.get_task(task_id)
                           ti.handle_failure(msg)
                       except Exception:
                           self.log.error("Cannot load the dag bag to handle failure for %s"
                                          ". Setting task to FAILED without callbacks or "
                                          "retries. Do you have enough resources?", ti)
                           ti.state = State.FAILED
                           session.merge(ti)
                           session.commit()
   ```
                  
   Unfortunately I think that moving __process_executor_events_ before __process_and_execute_tasks_ would not solve the issue as event might arrive from executor while __process_and_execute_tasks_ is executing. Increasing _poke_interval_ reduces chance of this race condition happening when scheduler is under a heavy load.
   
   I'm not too familiar with Airflow code base, but it seems that the root cause is the way how reschedule works and the fact that _try_number_ is not changing. Because of that scheduler thinks that event for past execution is for the ongoing one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org