You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Nadeem Ahmed Nazeer <na...@neon-lab.com> on 2016/07/13 08:43:11 UTC

airflow scheduler slowness as tasks increase

Hi,

We are using airflow to establish a data pipeline that runs tasks on
ephemeral amazon emr cluster. The oldest data we have is from 2014-05-26
which we have set as the start date with a scheduler interval of 1 day for
airflow.

We have an s3 copy task, a map reduce task and a bunch of hive and impala
load tasks in our DAG all run via PythonOperator. Our expectation is for
airflow to run each of these tasks for each day from the start date till
current date.

Just for numbers, the number of dags that got created were approximately
800 from start date till current date (2016-07-13). All is well at the
start of the execution but as it executes more and more tasks, the
scheduling of tasks starts slowing down. Looks like the scheduler is
spending lot of time in checking states and other houskeeping tasks.

One scheduler loop is taking almost 240 to 300 seconds due to the huge
number of tasks. It has been running my dags for over 24 hours now with
little progress. I am starting the scheduler process with restart for every
5 runs which is the default (airflow scheduler -n 5).

I did play around with different parallelism and config parameters without
much help. I am looking for some assistance on making scheduler quickly and
effectively schedule the tasks. Please help.

Configs :
parallelism = 32
dag_concurrency = 16
max_active_runs_per_dag = 99999
celeryd_concurrency = 16
scheduler_heartbeat_sec = 5

Thanks,
Nadeem

Re: airflow scheduler slowness as tasks increase

Posted by siddharth anand <sa...@apache.org>.
and https://github.com/apache/incubator-airflow/pull/1735

On Mon, Aug 15, 2016 at 7:33 PM, siddharth anand <sa...@apache.org> wrote:

> Nice idea....
>
> https://issues.apache.org/jira/browse/AIRFLOW-431
>
> On Mon, Aug 8, 2016 at 4:54 AM, Jeremiah Lowin <jl...@apache.org> wrote:
>
>> Sure, just modify this code:
>>
>> import airflow
>> from airflow.models import Pool
>> sess = airflow.settings.session()
>>
>> pool = (
>>     sess.query(Pool)
>>     .filter(Pool.pool=='my_pool')
>>     .first())
>>
>> if not pool:
>>     session.add(
>>         Pool(
>>             pool='my_pool',
>>             slots=8,
>>             description='this is my pool'
>>         )
>>     )
>>     session.commit()
>>
>>
>>
>> On Sun, Aug 7, 2016 at 4:37 PM Nadeem Ahmed Nazeer <na...@neon-lab.com>
>> wrote:
>>
>> > Could we create a pool programmatically instead of manually creating
>> from
>> > UI? I want to create this pool from the chef script when airflow starts
>> up.
>> >
>> > Thanks,
>> > Nadeem
>> >
>> > On Wed, Jul 13, 2016 at 5:21 PM, Lance Norskog <lance.norskog@gmail.com
>> >
>> > wrote:
>> >
>> > > Nazeer- "If I don't use num_runs, scheduler would just stop after
>> running
>> > > some number of tasks and I can't figure out why."
>> > > This is a known bug.
>> > >
>> > > One way to help this scheduling is to create a Pool. A Pool is a queue
>> > > gatekeeper that allows at most N tasks to run concurrently. If you set
>> > the
>> > > Pool size to, say, 5-10 and make all tasks join that pool, then only
>> that
>> > > many tasks will run. The point of Pools is to regulate access to
>> > contested
>> > > resources. In this case, all of your external services (S3, Hadoop)
>> are
>> > > contested resources. In this case, you may have 30 S3 jobs running at
>> > once
>> > > and 50 M/R jobs trying to run. You will find this all runs more
>> smoothly
>> > > when you control the number of active tasks using a resource.
>> > >
>> > > Another technique is that either a DAG or a task (I can't remember
>> which)
>> > > can wait until previous days finish. This is another way to regulate
>> the
>> > > flow of tasks.
>> > >
>> > > After all, you would not do this in the shell:
>> > >
>> > > for x in 500 hive scripts
>> > > do
>> > >    hive -f $x &
>> > > done
>> > >
>> > > This is exactly what Airflow is doing with out-of-control tasks.
>> > >
>> > > Lance
>> > >
>> > > On Wed, Jul 13, 2016 at 11:18 AM, Nadeem Ahmed Nazeer <
>> > nazeer@neon-lab.com
>> > > >
>> > > wrote:
>> > >
>> > > > Thanks for the response Bolke. Looking forward to have this slowness
>> > with
>> > > > the scheduler fixed in the future airflow releases. I am currently
>> on
>> > > > version 1.7.0, will upgrade to 1.7.1.3 and also try your
>> suggestions.
>> > > >
>> > > > I am using CeleryExecutor. If I don't use num_runs, scheduler would
>> > just
>> > > > stop after running some number of tasks and I can't figure out why.
>> The
>> > > > scheduler would only start running after I restart the service
>> > manually.
>> > > > The fix to that was to add this parameter. I found the num_tasks
>> > > parameter
>> > > > used in the upstart script for the scheduler by default and also
>> read
>> > in
>> > > > the manual to use this (
>> > > > https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
>> ).
>> > > >
>> > > > Thanks,
>> > > > Nadeem
>> > > >
>> > > > On Wed, Jul 13, 2016 at 8:51 AM, Bolke de Bruin <bd...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Nadeem,
>> > > > >
>> > > > > Unfortunately this slowness is currently a deficit in the
>> scheduler.
>> > It
>> > > > > will be addressed
>> > > > > in the future, but obviously we are not there yet. To make it more
>> > > > > manageable you could
>> > > > > use end_date for the dag and create multiple dags for it, keeping
>> the
>> > > > > logic the same but
>> > > > > the dag_id and the start-date / end_date different. If you are on
>> > > 1.7.1.3
>> > > > > you will then benefit
>> > > > > from multiprocessing (max_threads for the scheduler). In addition
>> you
>> > > add
>> > > > > load by hand then.
>> > > > > Not ideal but it will work.
>> > > > >
>> > > > > Also depending the speed of your tasks finishing you could limit
>> the
>> > > > > heartbeat so the scheduler
>> > > > > does not run redundantly while not being able to fire off new
>> tasks.
>> > > > >
>> > > > > In addition why are you using num_runs? I definitely do not
>> recommend
>> > > > > using it with a
>> > > > > LocalExecutor and if you are on 1.7.1.3 I would not use it with
>> > Celery
>> > > > > either.
>> > > > >
>> > > > > I hope this helps!
>> > > > >
>> > > > > Bolke
>> > > > >
>> > > > > > Op 13 jul. 2016, om 10:43 heeft Nadeem Ahmed Nazeer <
>> > > > nazeer@neon-lab.com>
>> > > > > het volgende geschreven:
>> > > > > >
>> > > > > > Hi,
>> > > > > >
>> > > > > > We are using airflow to establish a data pipeline that runs
>> tasks
>> > on
>> > > > > > ephemeral amazon emr cluster. The oldest data we have is from
>> > > > 2014-05-26
>> > > > > > which we have set as the start date with a scheduler interval
>> of 1
>> > > day
>> > > > > for
>> > > > > > airflow.
>> > > > > >
>> > > > > > We have an s3 copy task, a map reduce task and a bunch of hive
>> and
>> > > > impala
>> > > > > > load tasks in our DAG all run via PythonOperator. Our
>> expectation
>> > is
>> > > > for
>> > > > > > airflow to run each of these tasks for each day from the start
>> date
>> > > > till
>> > > > > > current date.
>> > > > > >
>> > > > > > Just for numbers, the number of dags that got created were
>> > > > approximately
>> > > > > > 800 from start date till current date (2016-07-13). All is well
>> at
>> > > the
>> > > > > > start of the execution but as it executes more and more tasks,
>> the
>> > > > > > scheduling of tasks starts slowing down. Looks like the
>> scheduler
>> > is
>> > > > > > spending lot of time in checking states and other houskeeping
>> > tasks.
>> > > > > >
>> > > > > > One scheduler loop is taking almost 240 to 300 seconds due to
>> the
>> > > huge
>> > > > > > number of tasks. It has been running my dags for over 24 hours
>> now
>> > > with
>> > > > > > little progress. I am starting the scheduler process with
>> restart
>> > for
>> > > > > every
>> > > > > > 5 runs which is the default (airflow scheduler -n 5).
>> > > > > >
>> > > > > > I did play around with different parallelism and config
>> parameters
>> > > > > without
>> > > > > > much help. I am looking for some assistance on making scheduler
>> > > quickly
>> > > > > and
>> > > > > > effectively schedule the tasks. Please help.
>> > > > > >
>> > > > > > Configs :
>> > > > > > parallelism = 32
>> > > > > > dag_concurrency = 16
>> > > > > > max_active_runs_per_dag = 99999
>> > > > > > celeryd_concurrency = 16
>> > > > > > scheduler_heartbeat_sec = 5
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Nadeem
>> > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > Lance Norskog
>> > > lance.norskog@gmail.com
>> > > Redwood City, CA
>> > >
>> >
>>
>
>

Re: airflow scheduler slowness as tasks increase

Posted by siddharth anand <sa...@apache.org>.
Nice idea....

https://issues.apache.org/jira/browse/AIRFLOW-431

On Mon, Aug 8, 2016 at 4:54 AM, Jeremiah Lowin <jl...@apache.org> wrote:

> Sure, just modify this code:
>
> import airflow
> from airflow.models import Pool
> sess = airflow.settings.session()
>
> pool = (
>     sess.query(Pool)
>     .filter(Pool.pool=='my_pool')
>     .first())
>
> if not pool:
>     session.add(
>         Pool(
>             pool='my_pool',
>             slots=8,
>             description='this is my pool'
>         )
>     )
>     session.commit()
>
>
>
> On Sun, Aug 7, 2016 at 4:37 PM Nadeem Ahmed Nazeer <na...@neon-lab.com>
> wrote:
>
> > Could we create a pool programmatically instead of manually creating from
> > UI? I want to create this pool from the chef script when airflow starts
> up.
> >
> > Thanks,
> > Nadeem
> >
> > On Wed, Jul 13, 2016 at 5:21 PM, Lance Norskog <la...@gmail.com>
> > wrote:
> >
> > > Nazeer- "If I don't use num_runs, scheduler would just stop after
> running
> > > some number of tasks and I can't figure out why."
> > > This is a known bug.
> > >
> > > One way to help this scheduling is to create a Pool. A Pool is a queue
> > > gatekeeper that allows at most N tasks to run concurrently. If you set
> > the
> > > Pool size to, say, 5-10 and make all tasks join that pool, then only
> that
> > > many tasks will run. The point of Pools is to regulate access to
> > contested
> > > resources. In this case, all of your external services (S3, Hadoop) are
> > > contested resources. In this case, you may have 30 S3 jobs running at
> > once
> > > and 50 M/R jobs trying to run. You will find this all runs more
> smoothly
> > > when you control the number of active tasks using a resource.
> > >
> > > Another technique is that either a DAG or a task (I can't remember
> which)
> > > can wait until previous days finish. This is another way to regulate
> the
> > > flow of tasks.
> > >
> > > After all, you would not do this in the shell:
> > >
> > > for x in 500 hive scripts
> > > do
> > >    hive -f $x &
> > > done
> > >
> > > This is exactly what Airflow is doing with out-of-control tasks.
> > >
> > > Lance
> > >
> > > On Wed, Jul 13, 2016 at 11:18 AM, Nadeem Ahmed Nazeer <
> > nazeer@neon-lab.com
> > > >
> > > wrote:
> > >
> > > > Thanks for the response Bolke. Looking forward to have this slowness
> > with
> > > > the scheduler fixed in the future airflow releases. I am currently on
> > > > version 1.7.0, will upgrade to 1.7.1.3 and also try your suggestions.
> > > >
> > > > I am using CeleryExecutor. If I don't use num_runs, scheduler would
> > just
> > > > stop after running some number of tasks and I can't figure out why.
> The
> > > > scheduler would only start running after I restart the service
> > manually.
> > > > The fix to that was to add this parameter. I found the num_tasks
> > > parameter
> > > > used in the upstart script for the scheduler by default and also read
> > in
> > > > the manual to use this (
> > > > https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
> ).
> > > >
> > > > Thanks,
> > > > Nadeem
> > > >
> > > > On Wed, Jul 13, 2016 at 8:51 AM, Bolke de Bruin <bd...@gmail.com>
> > > wrote:
> > > >
> > > > > Nadeem,
> > > > >
> > > > > Unfortunately this slowness is currently a deficit in the
> scheduler.
> > It
> > > > > will be addressed
> > > > > in the future, but obviously we are not there yet. To make it more
> > > > > manageable you could
> > > > > use end_date for the dag and create multiple dags for it, keeping
> the
> > > > > logic the same but
> > > > > the dag_id and the start-date / end_date different. If you are on
> > > 1.7.1.3
> > > > > you will then benefit
> > > > > from multiprocessing (max_threads for the scheduler). In addition
> you
> > > add
> > > > > load by hand then.
> > > > > Not ideal but it will work.
> > > > >
> > > > > Also depending the speed of your tasks finishing you could limit
> the
> > > > > heartbeat so the scheduler
> > > > > does not run redundantly while not being able to fire off new
> tasks.
> > > > >
> > > > > In addition why are you using num_runs? I definitely do not
> recommend
> > > > > using it with a
> > > > > LocalExecutor and if you are on 1.7.1.3 I would not use it with
> > Celery
> > > > > either.
> > > > >
> > > > > I hope this helps!
> > > > >
> > > > > Bolke
> > > > >
> > > > > > Op 13 jul. 2016, om 10:43 heeft Nadeem Ahmed Nazeer <
> > > > nazeer@neon-lab.com>
> > > > > het volgende geschreven:
> > > > > >
> > > > > > Hi,
> > > > > >
> > > > > > We are using airflow to establish a data pipeline that runs tasks
> > on
> > > > > > ephemeral amazon emr cluster. The oldest data we have is from
> > > > 2014-05-26
> > > > > > which we have set as the start date with a scheduler interval of
> 1
> > > day
> > > > > for
> > > > > > airflow.
> > > > > >
> > > > > > We have an s3 copy task, a map reduce task and a bunch of hive
> and
> > > > impala
> > > > > > load tasks in our DAG all run via PythonOperator. Our expectation
> > is
> > > > for
> > > > > > airflow to run each of these tasks for each day from the start
> date
> > > > till
> > > > > > current date.
> > > > > >
> > > > > > Just for numbers, the number of dags that got created were
> > > > approximately
> > > > > > 800 from start date till current date (2016-07-13). All is well
> at
> > > the
> > > > > > start of the execution but as it executes more and more tasks,
> the
> > > > > > scheduling of tasks starts slowing down. Looks like the scheduler
> > is
> > > > > > spending lot of time in checking states and other houskeeping
> > tasks.
> > > > > >
> > > > > > One scheduler loop is taking almost 240 to 300 seconds due to the
> > > huge
> > > > > > number of tasks. It has been running my dags for over 24 hours
> now
> > > with
> > > > > > little progress. I am starting the scheduler process with restart
> > for
> > > > > every
> > > > > > 5 runs which is the default (airflow scheduler -n 5).
> > > > > >
> > > > > > I did play around with different parallelism and config
> parameters
> > > > > without
> > > > > > much help. I am looking for some assistance on making scheduler
> > > quickly
> > > > > and
> > > > > > effectively schedule the tasks. Please help.
> > > > > >
> > > > > > Configs :
> > > > > > parallelism = 32
> > > > > > dag_concurrency = 16
> > > > > > max_active_runs_per_dag = 99999
> > > > > > celeryd_concurrency = 16
> > > > > > scheduler_heartbeat_sec = 5
> > > > > >
> > > > > > Thanks,
> > > > > > Nadeem
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Lance Norskog
> > > lance.norskog@gmail.com
> > > Redwood City, CA
> > >
> >
>

Re: airflow scheduler slowness as tasks increase

Posted by Jeremiah Lowin <jl...@apache.org>.
Sure, just modify this code:

import airflow
from airflow.models import Pool
sess = airflow.settings.session()

pool = (
    sess.query(Pool)
    .filter(Pool.pool=='my_pool')
    .first())

if not pool:
    session.add(
        Pool(
            pool='my_pool',
            slots=8,
            description='this is my pool'
        )
    )
    session.commit()



On Sun, Aug 7, 2016 at 4:37 PM Nadeem Ahmed Nazeer <na...@neon-lab.com>
wrote:

> Could we create a pool programmatically instead of manually creating from
> UI? I want to create this pool from the chef script when airflow starts up.
>
> Thanks,
> Nadeem
>
> On Wed, Jul 13, 2016 at 5:21 PM, Lance Norskog <la...@gmail.com>
> wrote:
>
> > Nazeer- "If I don't use num_runs, scheduler would just stop after running
> > some number of tasks and I can't figure out why."
> > This is a known bug.
> >
> > One way to help this scheduling is to create a Pool. A Pool is a queue
> > gatekeeper that allows at most N tasks to run concurrently. If you set
> the
> > Pool size to, say, 5-10 and make all tasks join that pool, then only that
> > many tasks will run. The point of Pools is to regulate access to
> contested
> > resources. In this case, all of your external services (S3, Hadoop) are
> > contested resources. In this case, you may have 30 S3 jobs running at
> once
> > and 50 M/R jobs trying to run. You will find this all runs more smoothly
> > when you control the number of active tasks using a resource.
> >
> > Another technique is that either a DAG or a task (I can't remember which)
> > can wait until previous days finish. This is another way to regulate the
> > flow of tasks.
> >
> > After all, you would not do this in the shell:
> >
> > for x in 500 hive scripts
> > do
> >    hive -f $x &
> > done
> >
> > This is exactly what Airflow is doing with out-of-control tasks.
> >
> > Lance
> >
> > On Wed, Jul 13, 2016 at 11:18 AM, Nadeem Ahmed Nazeer <
> nazeer@neon-lab.com
> > >
> > wrote:
> >
> > > Thanks for the response Bolke. Looking forward to have this slowness
> with
> > > the scheduler fixed in the future airflow releases. I am currently on
> > > version 1.7.0, will upgrade to 1.7.1.3 and also try your suggestions.
> > >
> > > I am using CeleryExecutor. If I don't use num_runs, scheduler would
> just
> > > stop after running some number of tasks and I can't figure out why. The
> > > scheduler would only start running after I restart the service
> manually.
> > > The fix to that was to add this parameter. I found the num_tasks
> > parameter
> > > used in the upstart script for the scheduler by default and also read
> in
> > > the manual to use this (
> > > https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls).
> > >
> > > Thanks,
> > > Nadeem
> > >
> > > On Wed, Jul 13, 2016 at 8:51 AM, Bolke de Bruin <bd...@gmail.com>
> > wrote:
> > >
> > > > Nadeem,
> > > >
> > > > Unfortunately this slowness is currently a deficit in the scheduler.
> It
> > > > will be addressed
> > > > in the future, but obviously we are not there yet. To make it more
> > > > manageable you could
> > > > use end_date for the dag and create multiple dags for it, keeping the
> > > > logic the same but
> > > > the dag_id and the start-date / end_date different. If you are on
> > 1.7.1.3
> > > > you will then benefit
> > > > from multiprocessing (max_threads for the scheduler). In addition you
> > add
> > > > load by hand then.
> > > > Not ideal but it will work.
> > > >
> > > > Also depending the speed of your tasks finishing you could limit the
> > > > heartbeat so the scheduler
> > > > does not run redundantly while not being able to fire off new tasks.
> > > >
> > > > In addition why are you using num_runs? I definitely do not recommend
> > > > using it with a
> > > > LocalExecutor and if you are on 1.7.1.3 I would not use it with
> Celery
> > > > either.
> > > >
> > > > I hope this helps!
> > > >
> > > > Bolke
> > > >
> > > > > Op 13 jul. 2016, om 10:43 heeft Nadeem Ahmed Nazeer <
> > > nazeer@neon-lab.com>
> > > > het volgende geschreven:
> > > > >
> > > > > Hi,
> > > > >
> > > > > We are using airflow to establish a data pipeline that runs tasks
> on
> > > > > ephemeral amazon emr cluster. The oldest data we have is from
> > > 2014-05-26
> > > > > which we have set as the start date with a scheduler interval of 1
> > day
> > > > for
> > > > > airflow.
> > > > >
> > > > > We have an s3 copy task, a map reduce task and a bunch of hive and
> > > impala
> > > > > load tasks in our DAG all run via PythonOperator. Our expectation
> is
> > > for
> > > > > airflow to run each of these tasks for each day from the start date
> > > till
> > > > > current date.
> > > > >
> > > > > Just for numbers, the number of dags that got created were
> > > approximately
> > > > > 800 from start date till current date (2016-07-13). All is well at
> > the
> > > > > start of the execution but as it executes more and more tasks, the
> > > > > scheduling of tasks starts slowing down. Looks like the scheduler
> is
> > > > > spending lot of time in checking states and other houskeeping
> tasks.
> > > > >
> > > > > One scheduler loop is taking almost 240 to 300 seconds due to the
> > huge
> > > > > number of tasks. It has been running my dags for over 24 hours now
> > with
> > > > > little progress. I am starting the scheduler process with restart
> for
> > > > every
> > > > > 5 runs which is the default (airflow scheduler -n 5).
> > > > >
> > > > > I did play around with different parallelism and config parameters
> > > > without
> > > > > much help. I am looking for some assistance on making scheduler
> > quickly
> > > > and
> > > > > effectively schedule the tasks. Please help.
> > > > >
> > > > > Configs :
> > > > > parallelism = 32
> > > > > dag_concurrency = 16
> > > > > max_active_runs_per_dag = 99999
> > > > > celeryd_concurrency = 16
> > > > > scheduler_heartbeat_sec = 5
> > > > >
> > > > > Thanks,
> > > > > Nadeem
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Lance Norskog
> > lance.norskog@gmail.com
> > Redwood City, CA
> >
>

Re: airflow scheduler slowness as tasks increase

Posted by Nadeem Ahmed Nazeer <na...@neon-lab.com>.
Could we create a pool programmatically instead of manually creating from
UI? I want to create this pool from the chef script when airflow starts up.

Thanks,
Nadeem

On Wed, Jul 13, 2016 at 5:21 PM, Lance Norskog <la...@gmail.com>
wrote:

> Nazeer- "If I don't use num_runs, scheduler would just stop after running
> some number of tasks and I can't figure out why."
> This is a known bug.
>
> One way to help this scheduling is to create a Pool. A Pool is a queue
> gatekeeper that allows at most N tasks to run concurrently. If you set the
> Pool size to, say, 5-10 and make all tasks join that pool, then only that
> many tasks will run. The point of Pools is to regulate access to contested
> resources. In this case, all of your external services (S3, Hadoop) are
> contested resources. In this case, you may have 30 S3 jobs running at once
> and 50 M/R jobs trying to run. You will find this all runs more smoothly
> when you control the number of active tasks using a resource.
>
> Another technique is that either a DAG or a task (I can't remember which)
> can wait until previous days finish. This is another way to regulate the
> flow of tasks.
>
> After all, you would not do this in the shell:
>
> for x in 500 hive scripts
> do
>    hive -f $x &
> done
>
> This is exactly what Airflow is doing with out-of-control tasks.
>
> Lance
>
> On Wed, Jul 13, 2016 at 11:18 AM, Nadeem Ahmed Nazeer <nazeer@neon-lab.com
> >
> wrote:
>
> > Thanks for the response Bolke. Looking forward to have this slowness with
> > the scheduler fixed in the future airflow releases. I am currently on
> > version 1.7.0, will upgrade to 1.7.1.3 and also try your suggestions.
> >
> > I am using CeleryExecutor. If I don't use num_runs, scheduler would just
> > stop after running some number of tasks and I can't figure out why. The
> > scheduler would only start running after I restart the service manually.
> > The fix to that was to add this parameter. I found the num_tasks
> parameter
> > used in the upstart script for the scheduler by default and also read in
> > the manual to use this (
> > https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls).
> >
> > Thanks,
> > Nadeem
> >
> > On Wed, Jul 13, 2016 at 8:51 AM, Bolke de Bruin <bd...@gmail.com>
> wrote:
> >
> > > Nadeem,
> > >
> > > Unfortunately this slowness is currently a deficit in the scheduler. It
> > > will be addressed
> > > in the future, but obviously we are not there yet. To make it more
> > > manageable you could
> > > use end_date for the dag and create multiple dags for it, keeping the
> > > logic the same but
> > > the dag_id and the start-date / end_date different. If you are on
> 1.7.1.3
> > > you will then benefit
> > > from multiprocessing (max_threads for the scheduler). In addition you
> add
> > > load by hand then.
> > > Not ideal but it will work.
> > >
> > > Also depending the speed of your tasks finishing you could limit the
> > > heartbeat so the scheduler
> > > does not run redundantly while not being able to fire off new tasks.
> > >
> > > In addition why are you using num_runs? I definitely do not recommend
> > > using it with a
> > > LocalExecutor and if you are on 1.7.1.3 I would not use it with Celery
> > > either.
> > >
> > > I hope this helps!
> > >
> > > Bolke
> > >
> > > > Op 13 jul. 2016, om 10:43 heeft Nadeem Ahmed Nazeer <
> > nazeer@neon-lab.com>
> > > het volgende geschreven:
> > > >
> > > > Hi,
> > > >
> > > > We are using airflow to establish a data pipeline that runs tasks on
> > > > ephemeral amazon emr cluster. The oldest data we have is from
> > 2014-05-26
> > > > which we have set as the start date with a scheduler interval of 1
> day
> > > for
> > > > airflow.
> > > >
> > > > We have an s3 copy task, a map reduce task and a bunch of hive and
> > impala
> > > > load tasks in our DAG all run via PythonOperator. Our expectation is
> > for
> > > > airflow to run each of these tasks for each day from the start date
> > till
> > > > current date.
> > > >
> > > > Just for numbers, the number of dags that got created were
> > approximately
> > > > 800 from start date till current date (2016-07-13). All is well at
> the
> > > > start of the execution but as it executes more and more tasks, the
> > > > scheduling of tasks starts slowing down. Looks like the scheduler is
> > > > spending lot of time in checking states and other houskeeping tasks.
> > > >
> > > > One scheduler loop is taking almost 240 to 300 seconds due to the
> huge
> > > > number of tasks. It has been running my dags for over 24 hours now
> with
> > > > little progress. I am starting the scheduler process with restart for
> > > every
> > > > 5 runs which is the default (airflow scheduler -n 5).
> > > >
> > > > I did play around with different parallelism and config parameters
> > > without
> > > > much help. I am looking for some assistance on making scheduler
> quickly
> > > and
> > > > effectively schedule the tasks. Please help.
> > > >
> > > > Configs :
> > > > parallelism = 32
> > > > dag_concurrency = 16
> > > > max_active_runs_per_dag = 99999
> > > > celeryd_concurrency = 16
> > > > scheduler_heartbeat_sec = 5
> > > >
> > > > Thanks,
> > > > Nadeem
> > >
> > >
> >
>
>
>
> --
> Lance Norskog
> lance.norskog@gmail.com
> Redwood City, CA
>

Re: airflow scheduler slowness as tasks increase

Posted by Lance Norskog <la...@gmail.com>.
Nazeer- "If I don't use num_runs, scheduler would just stop after running
some number of tasks and I can't figure out why."
This is a known bug.

One way to help this scheduling is to create a Pool. A Pool is a queue
gatekeeper that allows at most N tasks to run concurrently. If you set the
Pool size to, say, 5-10 and make all tasks join that pool, then only that
many tasks will run. The point of Pools is to regulate access to contested
resources. In this case, all of your external services (S3, Hadoop) are
contested resources. In this case, you may have 30 S3 jobs running at once
and 50 M/R jobs trying to run. You will find this all runs more smoothly
when you control the number of active tasks using a resource.

Another technique is that either a DAG or a task (I can't remember which)
can wait until previous days finish. This is another way to regulate the
flow of tasks.

After all, you would not do this in the shell:

for x in 500 hive scripts
do
   hive -f $x &
done

This is exactly what Airflow is doing with out-of-control tasks.

Lance

On Wed, Jul 13, 2016 at 11:18 AM, Nadeem Ahmed Nazeer <na...@neon-lab.com>
wrote:

> Thanks for the response Bolke. Looking forward to have this slowness with
> the scheduler fixed in the future airflow releases. I am currently on
> version 1.7.0, will upgrade to 1.7.1.3 and also try your suggestions.
>
> I am using CeleryExecutor. If I don't use num_runs, scheduler would just
> stop after running some number of tasks and I can't figure out why. The
> scheduler would only start running after I restart the service manually.
> The fix to that was to add this parameter. I found the num_tasks parameter
> used in the upstart script for the scheduler by default and also read in
> the manual to use this (
> https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls).
>
> Thanks,
> Nadeem
>
> On Wed, Jul 13, 2016 at 8:51 AM, Bolke de Bruin <bd...@gmail.com> wrote:
>
> > Nadeem,
> >
> > Unfortunately this slowness is currently a deficit in the scheduler. It
> > will be addressed
> > in the future, but obviously we are not there yet. To make it more
> > manageable you could
> > use end_date for the dag and create multiple dags for it, keeping the
> > logic the same but
> > the dag_id and the start-date / end_date different. If you are on 1.7.1.3
> > you will then benefit
> > from multiprocessing (max_threads for the scheduler). In addition you add
> > load by hand then.
> > Not ideal but it will work.
> >
> > Also depending the speed of your tasks finishing you could limit the
> > heartbeat so the scheduler
> > does not run redundantly while not being able to fire off new tasks.
> >
> > In addition why are you using num_runs? I definitely do not recommend
> > using it with a
> > LocalExecutor and if you are on 1.7.1.3 I would not use it with Celery
> > either.
> >
> > I hope this helps!
> >
> > Bolke
> >
> > > Op 13 jul. 2016, om 10:43 heeft Nadeem Ahmed Nazeer <
> nazeer@neon-lab.com>
> > het volgende geschreven:
> > >
> > > Hi,
> > >
> > > We are using airflow to establish a data pipeline that runs tasks on
> > > ephemeral amazon emr cluster. The oldest data we have is from
> 2014-05-26
> > > which we have set as the start date with a scheduler interval of 1 day
> > for
> > > airflow.
> > >
> > > We have an s3 copy task, a map reduce task and a bunch of hive and
> impala
> > > load tasks in our DAG all run via PythonOperator. Our expectation is
> for
> > > airflow to run each of these tasks for each day from the start date
> till
> > > current date.
> > >
> > > Just for numbers, the number of dags that got created were
> approximately
> > > 800 from start date till current date (2016-07-13). All is well at the
> > > start of the execution but as it executes more and more tasks, the
> > > scheduling of tasks starts slowing down. Looks like the scheduler is
> > > spending lot of time in checking states and other houskeeping tasks.
> > >
> > > One scheduler loop is taking almost 240 to 300 seconds due to the huge
> > > number of tasks. It has been running my dags for over 24 hours now with
> > > little progress. I am starting the scheduler process with restart for
> > every
> > > 5 runs which is the default (airflow scheduler -n 5).
> > >
> > > I did play around with different parallelism and config parameters
> > without
> > > much help. I am looking for some assistance on making scheduler quickly
> > and
> > > effectively schedule the tasks. Please help.
> > >
> > > Configs :
> > > parallelism = 32
> > > dag_concurrency = 16
> > > max_active_runs_per_dag = 99999
> > > celeryd_concurrency = 16
> > > scheduler_heartbeat_sec = 5
> > >
> > > Thanks,
> > > Nadeem
> >
> >
>



-- 
Lance Norskog
lance.norskog@gmail.com
Redwood City, CA

Re: airflow scheduler slowness as tasks increase

Posted by Nadeem Ahmed Nazeer <na...@neon-lab.com>.
Thanks for the response Bolke. Looking forward to have this slowness with
the scheduler fixed in the future airflow releases. I am currently on
version 1.7.0, will upgrade to 1.7.1.3 and also try your suggestions.

I am using CeleryExecutor. If I don't use num_runs, scheduler would just
stop after running some number of tasks and I can't figure out why. The
scheduler would only start running after I restart the service manually.
The fix to that was to add this parameter. I found the num_tasks parameter
used in the upstart script for the scheduler by default and also read in
the manual to use this (
https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls).

Thanks,
Nadeem

On Wed, Jul 13, 2016 at 8:51 AM, Bolke de Bruin <bd...@gmail.com> wrote:

> Nadeem,
>
> Unfortunately this slowness is currently a deficit in the scheduler. It
> will be addressed
> in the future, but obviously we are not there yet. To make it more
> manageable you could
> use end_date for the dag and create multiple dags for it, keeping the
> logic the same but
> the dag_id and the start-date / end_date different. If you are on 1.7.1.3
> you will then benefit
> from multiprocessing (max_threads for the scheduler). In addition you add
> load by hand then.
> Not ideal but it will work.
>
> Also depending the speed of your tasks finishing you could limit the
> heartbeat so the scheduler
> does not run redundantly while not being able to fire off new tasks.
>
> In addition why are you using num_runs? I definitely do not recommend
> using it with a
> LocalExecutor and if you are on 1.7.1.3 I would not use it with Celery
> either.
>
> I hope this helps!
>
> Bolke
>
> > Op 13 jul. 2016, om 10:43 heeft Nadeem Ahmed Nazeer <na...@neon-lab.com>
> het volgende geschreven:
> >
> > Hi,
> >
> > We are using airflow to establish a data pipeline that runs tasks on
> > ephemeral amazon emr cluster. The oldest data we have is from 2014-05-26
> > which we have set as the start date with a scheduler interval of 1 day
> for
> > airflow.
> >
> > We have an s3 copy task, a map reduce task and a bunch of hive and impala
> > load tasks in our DAG all run via PythonOperator. Our expectation is for
> > airflow to run each of these tasks for each day from the start date till
> > current date.
> >
> > Just for numbers, the number of dags that got created were approximately
> > 800 from start date till current date (2016-07-13). All is well at the
> > start of the execution but as it executes more and more tasks, the
> > scheduling of tasks starts slowing down. Looks like the scheduler is
> > spending lot of time in checking states and other houskeeping tasks.
> >
> > One scheduler loop is taking almost 240 to 300 seconds due to the huge
> > number of tasks. It has been running my dags for over 24 hours now with
> > little progress. I am starting the scheduler process with restart for
> every
> > 5 runs which is the default (airflow scheduler -n 5).
> >
> > I did play around with different parallelism and config parameters
> without
> > much help. I am looking for some assistance on making scheduler quickly
> and
> > effectively schedule the tasks. Please help.
> >
> > Configs :
> > parallelism = 32
> > dag_concurrency = 16
> > max_active_runs_per_dag = 99999
> > celeryd_concurrency = 16
> > scheduler_heartbeat_sec = 5
> >
> > Thanks,
> > Nadeem
>
>

Re: airflow scheduler slowness as tasks increase

Posted by Bolke de Bruin <bd...@gmail.com>.
Nadeem,

Unfortunately this slowness is currently a deficit in the scheduler. It will be addressed 
in the future, but obviously we are not there yet. To make it more manageable you could 
use end_date for the dag and create multiple dags for it, keeping the logic the same but 
the dag_id and the start-date / end_date different. If you are on 1.7.1.3 you will then benefit
from multiprocessing (max_threads for the scheduler). In addition you add load by hand then.
Not ideal but it will work.

Also depending the speed of your tasks finishing you could limit the heartbeat so the scheduler
does not run redundantly while not being able to fire off new tasks.

In addition why are you using num_runs? I definitely do not recommend using it with a 
LocalExecutor and if you are on 1.7.1.3 I would not use it with Celery either.

I hope this helps!

Bolke

> Op 13 jul. 2016, om 10:43 heeft Nadeem Ahmed Nazeer <na...@neon-lab.com> het volgende geschreven:
> 
> Hi,
> 
> We are using airflow to establish a data pipeline that runs tasks on
> ephemeral amazon emr cluster. The oldest data we have is from 2014-05-26
> which we have set as the start date with a scheduler interval of 1 day for
> airflow.
> 
> We have an s3 copy task, a map reduce task and a bunch of hive and impala
> load tasks in our DAG all run via PythonOperator. Our expectation is for
> airflow to run each of these tasks for each day from the start date till
> current date.
> 
> Just for numbers, the number of dags that got created were approximately
> 800 from start date till current date (2016-07-13). All is well at the
> start of the execution but as it executes more and more tasks, the
> scheduling of tasks starts slowing down. Looks like the scheduler is
> spending lot of time in checking states and other houskeeping tasks.
> 
> One scheduler loop is taking almost 240 to 300 seconds due to the huge
> number of tasks. It has been running my dags for over 24 hours now with
> little progress. I am starting the scheduler process with restart for every
> 5 runs which is the default (airflow scheduler -n 5).
> 
> I did play around with different parallelism and config parameters without
> much help. I am looking for some assistance on making scheduler quickly and
> effectively schedule the tasks. Please help.
> 
> Configs :
> parallelism = 32
> dag_concurrency = 16
> max_active_runs_per_dag = 99999
> celeryd_concurrency = 16
> scheduler_heartbeat_sec = 5
> 
> Thanks,
> Nadeem