You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/16 03:52:45 UTC

[GitHub] [airflow] freedom1989 commented on issue #10790: Copy of [AIRFLOW-5071] JIRA: Thousands of Executor reports task instance X finished (success) although the task says its queued. Was the task killed externally?

freedom1989 commented on issue #10790:
URL: https://github.com/apache/airflow/issues/10790#issuecomment-727719402


   The cause is clear as @rafalkozik mentioned. After scheduler schedule the task at the second time(put it in queue) and then it start process the executor events of the task's first-try. It occurs when the scheduling loop time > sensor task reschedule interval. 
   Either reducing the scheduler looping time(dag processing time, etc) or increasing the sensor task reschedule interval will work. 
   
   The bug can also be fixed if the rescheduled task instance use a different try number, but this will cause a lot of log files.
   
   ```
       def _process_executor_events(self, simple_dag_bag, session=None):
          
          # ...
          
                   if ti.try_number == try_number and ti.state == State.QUEUED:  # <-- try number for a sensor task is always the same
                       msg = ("Executor reports task instance {} finished ({}) "
                              "although the task says its {}. Was the task "
                              "killed externally?".format(ti, state, ti.state))
                       Stats.incr('scheduler.tasks.killed_externally')
                       self.log.error(msg)
                       try:
                           simple_dag = simple_dag_bag.get_dag(dag_id)
                           dagbag = models.DagBag(simple_dag.full_filepath)
                           dag = dagbag.get_dag(dag_id)
                           ti.task = dag.get_task(task_id)
                           ti.handle_failure(msg)
                       except Exception:
                           self.log.error("Cannot load the dag bag to handle failure for %s"
                                          ". Setting task to FAILED without callbacks or "
                                          "retries. Do you have enough resources?", ti)
                           ti.state = State.FAILED
                           session.merge(ti)
                           session.commit()
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org