You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Artiom (JIRA)" <ji...@apache.org> on 2018/05/15 15:51:00 UTC

[jira] [Updated] (AIRFLOW-2468) Clearing tasks results in zombie processes and tasks queuing

     [ https://issues.apache.org/jira/browse/AIRFLOW-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Artiom updated AIRFLOW-2468:
----------------------------
    Affects Version/s:     (was: Airflow 1.8)
                       1.9.0
          Description: 
Hi everyone,

I was in the process of upgrading airflow to 1.9 when I spotted strange airflow behavior. Currently I have a default airflow 1.9 set up with LocalExecutor and Postgres database. Set up using systemd.

To make this error reproducible I created two identical dags. 

 
{code:java}
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(
    dag_id='example_clearing_job', default_args=args,
    schedule_interval='0 0 * * *')

first_task = BashOperator(
    task_id='first_task', bash_command='sleep 60', dag=dag)

second_task = BashOperator(
    task_id='second_task',
    bash_command='sleep 180',
    dag=dag)
second_task.set_upstream(first_task)

run_this = BashOperator(
    task_id='run_this', bash_command='echo 1', dag=dag)
run_this.set_upstream(second_task)

run_that = BashOperator(
    task_id='run_that', bash_command='ls -l', dag=dag)
run_that.set_upstream(second_task)
{code}
and

 

 
{code:java}
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(
    dag_id='example_clearing_job_2', default_args=args,
    schedule_interval='0 0 * * *')

first_task = BashOperator(
    task_id='first_task', bash_command='sleep 60', dag=dag)

second_task = BashOperator(
    task_id='second_task',
    bash_command='sleep 180',
    dag=dag)
second_task.set_upstream(first_task)

run_this = BashOperator(
    task_id='run_this', bash_command='echo 1', dag=dag)
run_this.set_upstream(second_task)

run_that = BashOperator(
    task_id='run_that', bash_command='ls -l', dag=dag)
run_that.set_upstream(second_task)
{code}
 

Suppose I have only these two dags in my airflow and some finished runs in each dag. If I will go and clear all task in first dag and then do exactly the same thing in other dag I start noticing weird things.
 * The first task in the dag that I cleared first kicks off
 * The first task in the dag that I cleared second does not start until first task in the first dag finishes (after sleep 60 finishes).
 * On the output of ps -ax | grep airflow I see a lot of zombie processes.
    !zombies.PNG!

I found couple of things that helps with this issue.

Firstly I noticed that if I disable second DAG before clearing tasks it does not create zombies and works fine alongside first DAG. 

Secondly, I noticed a slight improvement when I increased number of max_threads in scheduler config.

Zombie processes eventually disappear when the first job in the first dag finishes, but before this happens no other task gets scheduled.

Is there an explanation to this strange behavior or maybe I am doing something incorrectly?What parts of airflow are related to this error and are there any plans to fix it?

 

Thanks,

Artiom 

 

  was:
Hi everyone,

I was in the process of upgrading airflow to 1.8 when I spotted strange airflow behavior. Currently I have a default airflow 1.8 set up with LocalExecutor and Postgres database. Set up using systemd.

To make this error reproducible I created two identical dags. 

 
{code:java}
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(
    dag_id='example_clearing_job', default_args=args,
    schedule_interval='0 0 * * *')

first_task = BashOperator(
    task_id='first_task', bash_command='sleep 60', dag=dag)

second_task = BashOperator(
    task_id='second_task',
    bash_command='sleep 180',
    dag=dag)
second_task.set_upstream(first_task)

run_this = BashOperator(
    task_id='run_this', bash_command='echo 1', dag=dag)
run_this.set_upstream(second_task)

run_that = BashOperator(
    task_id='run_that', bash_command='ls -l', dag=dag)
run_that.set_upstream(second_task)
{code}
and

 

 
{code:java}
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(
    dag_id='example_clearing_job_2', default_args=args,
    schedule_interval='0 0 * * *')

first_task = BashOperator(
    task_id='first_task', bash_command='sleep 60', dag=dag)

second_task = BashOperator(
    task_id='second_task',
    bash_command='sleep 180',
    dag=dag)
second_task.set_upstream(first_task)

run_this = BashOperator(
    task_id='run_this', bash_command='echo 1', dag=dag)
run_this.set_upstream(second_task)

run_that = BashOperator(
    task_id='run_that', bash_command='ls -l', dag=dag)
run_that.set_upstream(second_task)
{code}
 

Suppose I have only these two dags in my airflow and some finished runs in each dag. If I will go and clear all task in first dag and then do exactly the same thing in other dag I start noticing weird things.
 * The first task in the dag that I cleared first kicks off
 * The first task in the dag that I cleared second does not start until first task in the first dag finishes (after sleep 60 finishes).
 * On the output of ps -ax | grep airflow I see a lot of zombie processes.
  !zombies.PNG!

I found couple of things that helps with this issue.

Firstly I noticed that if I disable second DAG before clearing tasks it does not create zombies and works fine alongside first DAG. 

Secondly, I noticed a slight improvement when I increased number of max_threads in scheduler config.

Zombie processes eventually disappear when the first job in the first dag finishes, but before this happens no other task gets scheduled.

Is there an explanation to this strange behavior or maybe I am doing something incorrectly?What parts of airflow are related to this error and are there any plans to fix it?

 

Thanks,

Artiom 

 


> Clearing tasks results in zombie processes and tasks queuing
> ------------------------------------------------------------
>
>                 Key: AIRFLOW-2468
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2468
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: core
>    Affects Versions: Airflow 1.7.1.3, 1.9.0
>            Reporter: Artiom
>            Priority: Major
>              Labels: performance
>         Attachments: zombies.PNG
>
>
> Hi everyone,
> I was in the process of upgrading airflow to 1.9 when I spotted strange airflow behavior. Currently I have a default airflow 1.9 set up with LocalExecutor and Postgres database. Set up using systemd.
> To make this error reproducible I created two identical dags. 
>  
> {code:java}
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.models import DAG
> from datetime import datetime, timedelta
> seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
>                                   datetime.min.time())
> args = {
>     'owner': 'airflow',
>     'start_date': seven_days_ago,
> }
> dag = DAG(
>     dag_id='example_clearing_job', default_args=args,
>     schedule_interval='0 0 * * *')
> first_task = BashOperator(
>     task_id='first_task', bash_command='sleep 60', dag=dag)
> second_task = BashOperator(
>     task_id='second_task',
>     bash_command='sleep 180',
>     dag=dag)
> second_task.set_upstream(first_task)
> run_this = BashOperator(
>     task_id='run_this', bash_command='echo 1', dag=dag)
> run_this.set_upstream(second_task)
> run_that = BashOperator(
>     task_id='run_that', bash_command='ls -l', dag=dag)
> run_that.set_upstream(second_task)
> {code}
> and
>  
>  
> {code:java}
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.models import DAG
> from datetime import datetime, timedelta
> seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
>                                   datetime.min.time())
> args = {
>     'owner': 'airflow',
>     'start_date': seven_days_ago,
> }
> dag = DAG(
>     dag_id='example_clearing_job_2', default_args=args,
>     schedule_interval='0 0 * * *')
> first_task = BashOperator(
>     task_id='first_task', bash_command='sleep 60', dag=dag)
> second_task = BashOperator(
>     task_id='second_task',
>     bash_command='sleep 180',
>     dag=dag)
> second_task.set_upstream(first_task)
> run_this = BashOperator(
>     task_id='run_this', bash_command='echo 1', dag=dag)
> run_this.set_upstream(second_task)
> run_that = BashOperator(
>     task_id='run_that', bash_command='ls -l', dag=dag)
> run_that.set_upstream(second_task)
> {code}
>  
> Suppose I have only these two dags in my airflow and some finished runs in each dag. If I will go and clear all task in first dag and then do exactly the same thing in other dag I start noticing weird things.
>  * The first task in the dag that I cleared first kicks off
>  * The first task in the dag that I cleared second does not start until first task in the first dag finishes (after sleep 60 finishes).
>  * On the output of ps -ax | grep airflow I see a lot of zombie processes.
>     !zombies.PNG!
> I found couple of things that helps with this issue.
> Firstly I noticed that if I disable second DAG before clearing tasks it does not create zombies and works fine alongside first DAG. 
> Secondly, I noticed a slight improvement when I increased number of max_threads in scheduler config.
> Zombie processes eventually disappear when the first job in the first dag finishes, but before this happens no other task gets scheduled.
> Is there an explanation to this strange behavior or maybe I am doing something incorrectly?What parts of airflow are related to this error and are there any plans to fix it?
>  
> Thanks,
> Artiom 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)