You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ram (JIRA)" <ji...@apache.org> on 2017/09/08 18:11:01 UTC

[jira] [Commented] (AIRFLOW-928) Same {task,execution_date} run multiple times in worker when using CeleryExecutor

    [ https://issues.apache.org/jira/browse/AIRFLOW-928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16159037#comment-16159037 ] 

Ram commented on AIRFLOW-928:
-----------------------------

I am also facing same issue. The task is starting several times with in short interval . Please find below the log.

[2017-09-08 10:02:08,122] {models.py:167} INFO - Filling up the DagBag from /home/etluser/airflow/dags/AUXLiveCore.py
[2017-09-08 10:02:09,053] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run AUXLiveCore L_STG_DIM_SPONSOR 2017-09-07T10:00:00 --job_id 5275 --raw -sd DAGS_FOLDER/AUXLiveCore.py']
[2017-09-08 10:02:14,911] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:14,911] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-09-08 10:02:15,526] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:15,525] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-09-08 10:02:15,866] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:15,866] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-09-08 10:02:17,779] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:17,779] {models.py:167} INFO - Filling up the DagBag from /home/etluser/airflow/dags/AUXLiveCore.py
[2017-09-08 10:02:18,782] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:18,782] {models.py:1126} INFO - Dependencies all met for <TaskInstance: AUXLiveCore.L_STG_DIM_SPONSOR 2017-09-07 10:00:00 [None]>
[2017-09-08 10:02:18,842] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:18,842] {models.py:1126} INFO - Dependencies all met for <TaskInstance: AUXLiveCore.L_STG_DIM_SPONSOR 2017-09-07 10:00:00 [None]>
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:18,842] {models.py:1318} INFO - 
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 1
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: 

[2017-09-08 10:02:08,122] {models.py:167} INFO - Filling up the DagBag from /home/etluser/airflow/dags/AUXLiveCore.py
[2017-09-08 10:02:09,053] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run AUXLiveCore L_STG_DIM_SPONSOR 2017-09-07T10:00:00 --job_id 5275 --raw -sd DAGS_FOLDER/AUXLiveCore.py']
[2017-09-08 10:02:14,911] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:14,911] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-09-08 10:02:15,526] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:15,525] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-09-08 10:02:15,866] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:15,866] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-09-08 10:02:17,779] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:17,779] {models.py:167} INFO - Filling up the DagBag from /home/etluser/airflow/dags/AUXLiveCore.py
[2017-09-08 10:02:18,782] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:18,782] {models.py:1126} INFO - Dependencies all met for <TaskInstance: AUXLiveCore.L_STG_DIM_SPONSOR 2017-09-07 10:00:00 [None]>
[2017-09-08 10:02:18,842] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:18,842] {models.py:1126} INFO - Dependencies all met for <TaskInstance: AUXLiveCore.L_STG_DIM_SPONSOR 2017-09-07 10:00:00 [None]>
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: [2017-09-08 10:02:18,842] {models.py:1318} INFO - 
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 1
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-09-08 10:02:18,843] {base_task_runner.py:95} INFO - Subtask: 



> Same {task,execution_date} run multiple times in worker when using CeleryExecutor
> ---------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-928
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-928
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery
>    Affects Versions: Airflow 1.7.1.3
>         Environment: Docker
>            Reporter: Uri Shamay
>         Attachments: airflow.log, dag_runs.png, dummy_dag.py, processes.list, rabbitmq.queue, scheduler.log, worker_2.log, worker.log
>
>
> Hi,
> When using with Airflow with CeleryExecutor, both RabbitMQ && Redis I tested, I see that when workers are down, the scheduler run each period of time **append** to the same key of {task,execution_date} in the broker, the same {task,execution_date}, what means is that if workers are down/can't connect to broker for few hours, I got in the broker thousands of same executions.
> In my scenario I have just one dummy dag to run with dag_concurrency of 4,
> I expected in that scenario that broker will hold just 4 messages, and the scheduler shouldn't queuing another and another and another for same {task, execution_date}
> What happened is that when workers start to consume messages, they got thousands of tasks for just 4 tasks, and when they trying to write to database for task_instances - there are errors of integrity while such {task,execution_date} already exist.
> Note that in my test after let Airflow to consume works of just one dag without workers for few hours, then I connect to the broker outside by custom client and retrieve the messages - there was thousands of same {dag,execution_date}.
> Even if the case is that there are a lot of dag works on the same key that run just one instance when poll thousands - it's still bad behavior, better to produce one message to the queue, and if some timeout occurred (like visibility), to set the key - and not append to it. 
> What happened is when workers are down for long time and have a lot of jobs that scheduled each minute, when workers come back, they got thousands of same jobs => cause to the worker to run the same dags a lot of times => a lot of wasted python runners => utilized all celery worker threads/processes => starve all other jobs till he understood that need just one instance from all same.
> Attached files:
> 1. airflow.log - this is the task log, you can see that few instances processes of same {task,execution_date} write to the same log file.
> 2. worker.log - this is the worker log, you can see that worker trying to run same {task,execution_date} multiple times + the errors from the database integrity that said that those tasks on those dates already exists.
> 3. scheduler.log to show that scheduler decided to send again and again and again infinitely the same {job,execution_date}
> 4. the dummy_dag.py of the test
> 5. rabbitmq.queue - show that after 5 minutes the broker queue contains 40 messages of same 4 {job,execution_date}
> 6. dag_runs.png - show that there are only 4 jobs that need to be run, while there are much more messages in the queue
> 7. processes.list - show that when start worker and doing: ps -ef | grep "airflow run", it show that worker run multiple times same {job,execution_date}
> 8. worker_2.log - show that when worker started - the same {job,execution_date} keys shown multiple times
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)