You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/26 16:03:58 UTC

[GitHub] [airflow] peay commented on issue #16573: State of this instance has been externally set to up_for_retry. Terminating instance.

peay commented on issue #16573:
URL: https://github.com/apache/airflow/issues/16573#issuecomment-886830337


   @eladkal not sure if it is the same issue, but I am seeing the same symptom on 2.1.1, and can reproduce easily by letting my DAGs run for an hour or two.
   
   This affects tasks randomly in my DAGs, usually after running for a few minutes. I have a single Celery worker pod which was healthy whenever this has occured so far.
   
   Here's a sample from a task log, where I have activated SQLAlchemy query logging.
   
   ```
   # Task has been running for a few minutes OK
   [2021-07-23 11:15:51,821] {base.py:769} INFO - COMMIT
   [2021-07-23 11:15:51,838] {base.py:727} INFO - BEGIN (implicit)
   
   # This apparently correctly says task instance state is running, since we don't stop here.
   [2021-07-23 11:15:51,839] {base.py:1234} INFO - SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_
 job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id 
   FROM task_instance 
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s 
    LIMIT %(param_1)s
   [2021-07-23 11:15:51,839] {base.py:1239} INFO - {'dag_id_1': 'dag-name-1', 'task_id_1': 'task-name-1', 'execution_date_1': DateTime(2021, 7, 13, 19, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1}
   [2021-07-23 11:15:51,842] {base.py:769} INFO - COMMIT
   
   # Task-specific logs
   [2021-07-23 11:15:53,022] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
   [2021-07-23 11:15:58,030] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
   [2021-07-23 11:16:03,037] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
   [2021-07-23 11:16:08,044] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
   [2021-07-23 11:16:13,054] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
   [2021-07-23 11:16:18,062] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
   
   [2021-07-23 11:16:21,863] {base.py:727} INFO - BEGIN (implicit)
   
   # Still OK
   [2021-07-23 11:16:21,863] {base.py:1234} INFO - SELECT job.id AS job_id, job.dag_id AS job_dag_id, job.state AS job_state, job.job_type AS job_job_type, job.start_date AS job_start_date, job.end_date AS job_end_date, job.latest_heartbeat AS job_latest_heartbeat, job.executor_class AS job_executor_class, job.hostname AS job_hostname, job.unixname AS job_unixname 
   FROM job 
   WHERE job.id = %(param_1)s AND job.job_type IN (%(job_type_1)s)
   [2021-07-23 11:16:21,864] {base.py:1239} INFO - {'param_1': 773, 'job_type_1': 'LocalTaskJob'}
   [2021-07-23 11:16:21,866] {base.py:1234} INFO - UPDATE job SET latest_heartbeat=%(latest_heartbeat)s WHERE job.id = %(job_id)s
   [2021-07-23 11:16:21,866] {base.py:1239} INFO - {'latest_heartbeat': datetime.datetime(2021, 7, 23, 11, 15, 51, 821303, tzinfo=Timezone('UTC')), 'job_id': 773}
   [2021-07-23 11:16:21,867] {base.py:769} INFO - COMMIT
   [2021-07-23 11:16:21,879] {base.py:727} INFO - BEGIN (implicit)
   [2021-07-23 11:16:21,880] {base.py:1234} INFO - SELECT job.id AS job_id, job.dag_id AS job_dag_id, job.state AS job_state, job.job_type AS job_job_type, job.start_date AS job_start_date, job.end_date AS job_end_date, job.latest_heartbeat AS job_latest_heartbeat, job.executor_class AS job_executor_class, job.hostname AS job_hostname, job.unixname AS job_unixname 
   FROM job 
   WHERE job.id = %(param_1)s AND job.job_type IN (%(job_type_1)s)
   [2021-07-23 11:16:21,880] {base.py:1239} INFO - {'param_1': 773, 'job_type_1': 'LocalTaskJob'}
   [2021-07-23 11:16:21,882] {base.py:769} INFO - COMMIT
   [2021-07-23 11:16:21,894] {base.py:727} INFO - BEGIN (implicit)
   
   # This time, 30s later, it seems this says the task instance is `up_for_retry`, but why?
   # I believe this query is https://github.com/apache/airflow/blob/2.1.1/airflow/jobs/local_task_job.py#L181
   [2021-07-23 11:16:21,895] {base.py:1234} INFO - SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_
 job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id 
   FROM task_instance 
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s 
    LIMIT %(param_1)s
   [2021-07-23 11:16:21,895] {base.py:1239} INFO - {'dag_id_1': 'dag-name-1', 'task_id_1': 'task-name-1', 'execution_date_1': DateTime(2021, 7, 13, 19, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1}
   [2021-07-23 11:16:21,898] {base.py:769} INFO - COMMIT
   
   # Boom
   [2021-07-23 11:16:21,899] {local_task_job.py:199} WARNING - State of this instance has been externally set to up_for_retry. Terminating instance.
   [2021-07-23 11:16:21,900] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 9303
   [2021-07-23 11:16:21,900] {taskinstance.py:1284} ERROR - Received SIGTERM. Terminating subprocesses.
   ```
   
   I have reviewed the logs for the scheduler, web server and worker with SQLAlchemy query logging as well in order to try and determine where the state is being altered, and found nothing... there is no `UPDATE` that sets the state `up_for_retry`, whether for this DAG/task, or in general over all task instance -- there's just never a query parameter `state` set to `up_for_retry` for `UPDATE`s that I could see. I can provide those logs if needed.
   
   At this stage, I am rather puzzled as to what is going on here. Are there more logs/checks I could enable to understand what's going on?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org