You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Kyle Hamlin <ha...@gmail.com> on 2018/03/21 17:34:57 UTC

Submitting 1000+ tasks to airflow programatically

Hello,

I'm currently using Airflow for some ETL tasks where I submit a spark job
to a cluster and poll till it is complete. This workflow is nice because it
is typically a single Dag. I'm now starting to do more machine learning
tasks and need to build a model per client which is 1000+ clients. My
spark cluster is capable of handling this workload, however, it doesn't
seem scalable to write 1000+ dags to fit models for each client. I want
each client to have its own task instance so it can be retried if it
fails without having to run all 1000+ tasks over again. How do I handle
this type of workflow in Airflow?

Re: Submitting 1000+ tasks to airflow programatically

Posted by Ace Haidrey <ac...@gmail.com>.
Hi,
This is a neat use case - glad you’re using Airflow for it!
Out of curiosity, why don’t you create a single dag, call it company_models_dag, and register each task to this dag. So for each company you have a spark task to build a model. You can do this programmatically, to loop through each company (you probably have a list of that). This way if one company model fails you can just rerun that, you don’t have to rerun entire dag - you pick up from where it left off as in the failed task and it’s downstream.
Some things to consider are you’ll want to limit the concurrency (unless you can indeed run 1000 tasks in your spark cluster at once). 

Sent from my iPhone

> On Mar 21, 2018, at 10:34 AM, Kyle Hamlin <ha...@gmail.com> wrote:
> 
> Hello,
> 
> I'm currently using Airflow for some ETL tasks where I submit a spark job
> to a cluster and poll till it is complete. This workflow is nice because it
> is typically a single Dag. I'm now starting to do more machine learning
> tasks and need to build a model per client which is 1000+ clients. My
> spark cluster is capable of handling this workload, however, it doesn't
> seem scalable to write 1000+ dags to fit models for each client. I want
> each client to have its own task instance so it can be retried if it
> fails without having to run all 1000+ tasks over again. How do I handle
> this type of workflow in Airflow?

Re: Submitting 1000+ tasks to airflow programatically

Posted by Maxime Beauchemin <ma...@gmail.com>.
On the open PR I described how DagFetcher might imply a new DAG manifest
(replacing the current DAG_FOLDER auto-parsing & discovery) that describes
a list of dag_ids and related DAG URIs.

That DAG manifest could be a static list OR a something dynamic if you pass
it a callable. To enable the dynamic DAGs pattern, perhaps that manifest
supports not only URIs but also DAG factories as functions that return one
or many DAG objects. This might be a tad too dynamic though, it can be good
to keep the balance and have something more predictable.

Max

On Thu, Mar 22, 2018 at 1:24 PM, James Meickle <jm...@quantopian.com>
wrote:

> I'm very excited about the possibility of implementing a DAGFetcher (per
> prior thread about this) that is aware of dynamic data sources, and can
> handle abstracting/caching/deploying them itself, rather than having each
> Airflow process run the query for each DAG refresh.
>
> On Thu, Mar 22, 2018 at 2:12 PM, Taylor Edmiston <te...@gmail.com>
> wrote:
>
> > I'm interested in hearing further discussion too, and if others have
> tried
> > something similar to our approach.  Several companies on this list have
> > mentioned various approaches to dynamic DAGs, and I think everyone needs
> > them eventually.  Maybe it's an opportunity for additional docs regarding
> > use cases like this and to document best practices from the community.
> >
> > *Taylor Edmiston*
> > TEdmiston.com <https://www.tedmiston.com/> | Blog
> > <http://blog.tedmiston.com>
> > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor>
> >
> >
> > On Thu, Mar 22, 2018 at 12:43 PM, Kyle Hamlin <ha...@gmail.com>
> wrote:
> >
> > > @Chris @Taylor
> > > Thank you guy very much for your explanations! Your strategy makes a
> lot
> > of
> > > sense to me. Generating a dag for each client I'm going to have a ton
> of
> > > dags on the front page but at least that is searchable haha. I'm going
> to
> > > give this implementation a shot and I'll try to report back with the
> > > outcome.
> > >
> > > Can anyone comment on future work to support data science workflows
> like
> > > these, or is Airflow fundamentally the wrong tool?
> > >
> > > On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <te...@gmail.com>
> > > wrote:
> > >
> > > > We're not using SubDagOperator.  Our approach is using 1 DAG file to
> > > > generate a separate DAG class instance for each similar config, which
> > > gets
> > > > hoisted into global namespace.  In simplified pseudo-Python, it looks
> > > like:
> > > >
> > > > # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
> > > > cache = Variable.get('sources', default_var={},
> deserialize_json=True)
> > > > sources = fetch_configs() if is_empty(cache) or is_expired(cache)
> else
> > > > cache['configs']
> > > > for source in sources:
> > > >   dag = DAG(...)
> > > >   globals()[source._id] = dag
> > > >   # ...create tasks and set dependencies for each DAG (some config
> > pulled
> > > > from source object for each)...
> > > >
> > > > We added the cache part for the same reason you pointed out, because
> > the
> > > > DAG processing loop was hitting the API a lot.  Btw, you can also
> turn
> > > down
> > > > how much the processing loop runs with scheduler_heartbeat_sec under
> > the
> > > > scheduler group in config.
> > > >
> > > > We also considered the route Chris mentioned of updating cache via a
> > > > separate DAG but weren't crazy about having a DAG scheduled once per
> > > > minute.
> > > >
> > > > *Taylor Edmiston*
> > > > TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > > <http://blog.tedmiston.com>
> > > > Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> LinkedIn
> > > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > <https://angel.co/taylor>
> > > >
> > > >
> > > > On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <dc...@gmail.com>
> > > wrote:
> > > >
> > > > > For us we compile down to Python rather than do the logic in
> Python,
> > > that
> > > > > makes it so the load doesn't do real work.
> > > > >
> > > > > We have our own DSL that is just a simplified compiler; parse,
> > analyze,
> > > > > optimize, code gen.  In code gen we just generate the Python code.
> > Our
> > > > > build then packages it up and have airflow fetch it (very hacky
> fetch
> > > > right
> > > > > now)
> > > > >
> > > > > This does make it so loading is simple and fast, but means you
> can't
> > > use
> > > > > the Python api directly
> > > > >
> > > > > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <
> andrewm4894@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > I've had similar issues with large dags being slow to render on
> ui
> > > and
> > > > > > crashing chrome.
> > > > > >
> > > > > > I got around it by changing the default tree view from 25 to just
> > 5.
> > > > > >
> > > > > > Involves a couple changes to source files though, would be great
> if
> > > > some
> > > > > of
> > > > > > the ui defaults could go into airflow.cfg.
> > > > > >
> > > > > > https://stackoverflow.com/a/48665734/1919374
> > > > > >
> > > > > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <cf...@gmail.com> wrote:
> > > > > >
> > > > > > > @Kyle, I do something similar and have run into the problems
> > you've
> > > > > > > mentioned. In my case, I access data from S3 and then generate
> > > > separate
> > > > > > > DAGs (of different structures) based on the data that's pulled.
> > > I've
> > > > > > > also found that the UI for accessing a single large DAG is slow
> > so
> > > I
> > > > > > > prefer to keep many separate DAGs. What I'd try is to define a
> > DAG
> > > > > > > that's responsible for accessing your API and caching the
> client
> > > IDs
> > > > > > > somewhere locally, maybe just to a file on disk or as an
> Airflow
> > > > > > > Variable. You can run this DAG on whatever schedule is
> > appropriate
> > > > for
> > > > > > > you. From there, build a function that creates a DAG and then
> for
> > > > each
> > > > > > > client ID, register a DAG built by that function to the global
> > > > context.
> > > > > > > Like this:
> > > > > > > def create_client_dag(client_id):
> > > > > > >     # build dag here
> > > > > > >
> > > > > > > def get_client_ids_locally():
> > > > > > >     # access the data that was pulled from the API
> > > > > > >
> > > > > > > client_ids = get_client_ids_locally()
> > > > > > > for client in client_ids:
> > > > > > >     dag = create_client_dag(client)
> > > > > > >     globals()[dag.dag_id] = dag
> > > > > > >
> > > > > > > This approach also handles removing client IDs somewhat
> > gracefully.
> > > > > DAGs
> > > > > > > for removed clients will still appear in the UI (you can build
> a
> > > > > > > maintenance DAG to clean that up), but they'll be disabled and
> > > their
> > > > > > > tasks won't be scheduled.
> > > > > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > > > > > Thanks for all the responses let me try to address the main
> > > themes.
> > > > > > > >
> > > > > > > > @Ace @Nicholas @Taylor
> > > > > > > > I originally started with a loop over my list of client ids
> and
> > > > > > > > created a> SparkSubmitOperator for each client. The pseudo
> code
> > > > would
> > > > > > > look
> > > > > > > > something> like this:
> > > > > > > >
> > > > > > > > dag = DAG(...)
> > > > > > > >
> > > > > > > > client_ids = get_client_ids()
> > > > > > > > for client_id in client_ids:
> > > > > > > >     SparkSubmitOperator(
> > > > > > > >         ...
> > > > > > > >         dag=dag
> > > > > > > >     )
> > > > > > > >
> > > > > > > > I found that this approach kind of clunky for a few reasons.
> > > > > > > > First, the> get_cleint_ids() function was hitting our API
> every
> > > > time
> > > > > > the
> > > > > > > dag
> > > > > > > > was read> by the scheduler which seemed excessive (every 30
> > > seconds
> > > > > or
> > > > > > > > so?). Second,> it seemed like when a single task failure made
> > > > marked
> > > > > > the
> > > > > > > whole
> > > > > > > > dag as a> failure, but I guess retrying till the task worked
> > > could
> > > > > > solve
> > > > > > > > this? Third,> the UI gets really clunky and slow, basically
> > > > unusable
> > > > > > > when it
> > > > > > > > tries to> render the graph view for that many tasks. Finally,
> > > > Airflow
> > > > > > > > doesn't seem> very happy when client_ids are removed i.e. the
> > > > > > > get_client_ids()
> > > > > > > > no longer> returns a specific client_id, it really seems to
> > want
> > > a
> > > > > > > static dag.
> > > > > > > >
> > > > > > > > Do I really have to poll and API or database every 30 seconds
> > for
> > > > > this>
> > > > > > > dynamic client_id data?
> > > > > > > >
> > > > > > > > @Ace
> > > > > > > > I have been limiting concurrency so as to not blast the
> cluster
> > > > > > > >
> > > > > > > > @Nicholas
> > > > > > > > Thank you for the noise suggestion I will definitely
> implement
> > > > > > > > that if I> continue with the same methodology
> > > > > > > >
> > > > > > > > @Taylor
> > > > > > > > Are you using a SubDagOperator? Or is your process similar to
> > the
> > > > > > > > pseudo code I wrote above?
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > > > > > <te...@gmail.com> wrote:>
> > > > > > > >> We also use a similar approach to generate dynamic DAGs
> based
> > on
> > > > > > > >> a common>> template DAG file.  We pull in the list of config
> > > > > objects,
> > > > > > > one
> > > > > > > >> per DAG,>> from an internal API lightly wrapping the
> database,
> > > > then
> > > > > we
> > > > > > > >> cache that>> response in a Airflow Variable that gets
> updated
> > > > once a
> > > > > > > minute.  The>> dynamic DAGs are generated from that variable.
> > > > > > > >>
> > > > > > > >> *Taylor Edmiston*
> > > > > > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > > > > > >> <http://blog.tedmiston.com>
> > > > > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor>
> |
> > > > > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> |
> AngelList
> > > > > > > >> <https://angel.co/taylor>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > > > > > > >> nicolas.kijak@weightwatchers.com> wrote:
> > > > > > > >>
> > > > > > > >>> Kyle,
> > > > > > > >>>
> > > > > > > >>> We have a similar approach but on a much, much smaller
> scale.
> > > We
> > > > > > > >>> now have>>> <100 “things to process” but expect it to grow
> to
> > > > under
> > > > > > > ~200.  Each>> “thing
> > > > > > > >>> to process” has the same workflow so we have a single DAG
> > > > > > > >>> definition that>>> does about 20 tasks per, then we loop
> over
> > > the
> > > > > > list
> > > > > > > of items and
> > > > > > > >>> produce>> a
> > > > > > > >>> dag object for each one adding it to the global definition.
> > > > > > > >>>
> > > > > > > >>> One of the things we quickly ran into was crushing the
> > > scheduler
> > > > > > as>>>
> > > > > > > everything was running with the same start time.  To get around
> > > > > > > >>> this we>> add
> > > > > > > >>> noise to the start time minute and seconds. Simply index %
> > 60.
> > > > > > > This>>> spreads out the load so that the scheduler isn’t trying
> > to
> > > > run
> > > > > > > >>> everything>>> at the exact same moment.  I would suggest if
> > you
> > > > do
> > > > > go
> > > > > > > this
> > > > > > > >>> route, to>> also
> > > > > > > >>> stagger your hours if you can because of how many you plan
> to
> > > > run.
> > > > > > > >> Perhaps
> > > > > > > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > > > > > > >>>
> > > > > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com>
> > > wrote:
> > > > > > > >>>
> > > > > > > >>>   Hello,
> > > > > > > >>>
> > > > > > > >>>   I'm currently using Airflow for some ETL tasks where I
> > > submit a
> > > > > > > >>>   spark>>> job
> > > > > > > >>>   to a cluster and poll till it is complete. This workflow
> is
> > > > nice
> > > > > > > >>> because it
> > > > > > > >>>   is typically a single Dag. I'm now starting to do more
> > > machine
> > > > > > > >> learning
> > > > > > > >>>   tasks and need to build a model per client which is 1000+
> > > > > > > >>>   clients. My>>>   spark cluster is capable of handling
> this
> > > > > > workload,
> > > > > > > however, it
> > > > > > > >> doesn't
> > > > > > > >>>   seem scalable to write 1000+ dags to fit models for each
> > > > client.
> > > > > > I>>
> > > > > > > want
> > > > > > > >>>   each client to have its own task instance so it can be
> > > retried
> > > > > > > >>>   if it>>>   fails without having to run all 1000+ tasks
> over
> > > > > again.
> > > > > > > How do I
> > > > > > > >> handle
> > > > > > > >>>   this type of workflow in Airflow?
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Submitting 1000+ tasks to airflow programatically

Posted by James Meickle <jm...@quantopian.com>.
I'm very excited about the possibility of implementing a DAGFetcher (per
prior thread about this) that is aware of dynamic data sources, and can
handle abstracting/caching/deploying them itself, rather than having each
Airflow process run the query for each DAG refresh.

On Thu, Mar 22, 2018 at 2:12 PM, Taylor Edmiston <te...@gmail.com>
wrote:

> I'm interested in hearing further discussion too, and if others have tried
> something similar to our approach.  Several companies on this list have
> mentioned various approaches to dynamic DAGs, and I think everyone needs
> them eventually.  Maybe it's an opportunity for additional docs regarding
> use cases like this and to document best practices from the community.
>
> *Taylor Edmiston*
> TEdmiston.com <https://www.tedmiston.com/> | Blog
> <http://blog.tedmiston.com>
> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor>
>
>
> On Thu, Mar 22, 2018 at 12:43 PM, Kyle Hamlin <ha...@gmail.com> wrote:
>
> > @Chris @Taylor
> > Thank you guy very much for your explanations! Your strategy makes a lot
> of
> > sense to me. Generating a dag for each client I'm going to have a ton of
> > dags on the front page but at least that is searchable haha. I'm going to
> > give this implementation a shot and I'll try to report back with the
> > outcome.
> >
> > Can anyone comment on future work to support data science workflows like
> > these, or is Airflow fundamentally the wrong tool?
> >
> > On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <te...@gmail.com>
> > wrote:
> >
> > > We're not using SubDagOperator.  Our approach is using 1 DAG file to
> > > generate a separate DAG class instance for each similar config, which
> > gets
> > > hoisted into global namespace.  In simplified pseudo-Python, it looks
> > like:
> > >
> > > # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
> > > cache = Variable.get('sources', default_var={}, deserialize_json=True)
> > > sources = fetch_configs() if is_empty(cache) or is_expired(cache) else
> > > cache['configs']
> > > for source in sources:
> > >   dag = DAG(...)
> > >   globals()[source._id] = dag
> > >   # ...create tasks and set dependencies for each DAG (some config
> pulled
> > > from source object for each)...
> > >
> > > We added the cache part for the same reason you pointed out, because
> the
> > > DAG processing loop was hitting the API a lot.  Btw, you can also turn
> > down
> > > how much the processing loop runs with scheduler_heartbeat_sec under
> the
> > > scheduler group in config.
> > >
> > > We also considered the route Chris mentioned of updating cache via a
> > > separate DAG but weren't crazy about having a DAG scheduled once per
> > > minute.
> > >
> > > *Taylor Edmiston*
> > > TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > <http://blog.tedmiston.com>
> > > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > <https://angel.co/taylor>
> > >
> > >
> > > On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <dc...@gmail.com>
> > wrote:
> > >
> > > > For us we compile down to Python rather than do the logic in Python,
> > that
> > > > makes it so the load doesn't do real work.
> > > >
> > > > We have our own DSL that is just a simplified compiler; parse,
> analyze,
> > > > optimize, code gen.  In code gen we just generate the Python code.
> Our
> > > > build then packages it up and have airflow fetch it (very hacky fetch
> > > right
> > > > now)
> > > >
> > > > This does make it so loading is simple and fast, but means you can't
> > use
> > > > the Python api directly
> > > >
> > > > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <andrewm4894@gmail.com
> >
> > > > wrote:
> > > >
> > > > > I've had similar issues with large dags being slow to render on ui
> > and
> > > > > crashing chrome.
> > > > >
> > > > > I got around it by changing the default tree view from 25 to just
> 5.
> > > > >
> > > > > Involves a couple changes to source files though, would be great if
> > > some
> > > > of
> > > > > the ui defaults could go into airflow.cfg.
> > > > >
> > > > > https://stackoverflow.com/a/48665734/1919374
> > > > >
> > > > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <cf...@gmail.com> wrote:
> > > > >
> > > > > > @Kyle, I do something similar and have run into the problems
> you've
> > > > > > mentioned. In my case, I access data from S3 and then generate
> > > separate
> > > > > > DAGs (of different structures) based on the data that's pulled.
> > I've
> > > > > > also found that the UI for accessing a single large DAG is slow
> so
> > I
> > > > > > prefer to keep many separate DAGs. What I'd try is to define a
> DAG
> > > > > > that's responsible for accessing your API and caching the client
> > IDs
> > > > > > somewhere locally, maybe just to a file on disk or as an Airflow
> > > > > > Variable. You can run this DAG on whatever schedule is
> appropriate
> > > for
> > > > > > you. From there, build a function that creates a DAG and then for
> > > each
> > > > > > client ID, register a DAG built by that function to the global
> > > context.
> > > > > > Like this:
> > > > > > def create_client_dag(client_id):
> > > > > >     # build dag here
> > > > > >
> > > > > > def get_client_ids_locally():
> > > > > >     # access the data that was pulled from the API
> > > > > >
> > > > > > client_ids = get_client_ids_locally()
> > > > > > for client in client_ids:
> > > > > >     dag = create_client_dag(client)
> > > > > >     globals()[dag.dag_id] = dag
> > > > > >
> > > > > > This approach also handles removing client IDs somewhat
> gracefully.
> > > > DAGs
> > > > > > for removed clients will still appear in the UI (you can build a
> > > > > > maintenance DAG to clean that up), but they'll be disabled and
> > their
> > > > > > tasks won't be scheduled.
> > > > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > > > > Thanks for all the responses let me try to address the main
> > themes.
> > > > > > >
> > > > > > > @Ace @Nicholas @Taylor
> > > > > > > I originally started with a loop over my list of client ids and
> > > > > > > created a> SparkSubmitOperator for each client. The pseudo code
> > > would
> > > > > > look
> > > > > > > something> like this:
> > > > > > >
> > > > > > > dag = DAG(...)
> > > > > > >
> > > > > > > client_ids = get_client_ids()
> > > > > > > for client_id in client_ids:
> > > > > > >     SparkSubmitOperator(
> > > > > > >         ...
> > > > > > >         dag=dag
> > > > > > >     )
> > > > > > >
> > > > > > > I found that this approach kind of clunky for a few reasons.
> > > > > > > First, the> get_cleint_ids() function was hitting our API every
> > > time
> > > > > the
> > > > > > dag
> > > > > > > was read> by the scheduler which seemed excessive (every 30
> > seconds
> > > > or
> > > > > > > so?). Second,> it seemed like when a single task failure made
> > > marked
> > > > > the
> > > > > > whole
> > > > > > > dag as a> failure, but I guess retrying till the task worked
> > could
> > > > > solve
> > > > > > > this? Third,> the UI gets really clunky and slow, basically
> > > unusable
> > > > > > when it
> > > > > > > tries to> render the graph view for that many tasks. Finally,
> > > Airflow
> > > > > > > doesn't seem> very happy when client_ids are removed i.e. the
> > > > > > get_client_ids()
> > > > > > > no longer> returns a specific client_id, it really seems to
> want
> > a
> > > > > > static dag.
> > > > > > >
> > > > > > > Do I really have to poll and API or database every 30 seconds
> for
> > > > this>
> > > > > > dynamic client_id data?
> > > > > > >
> > > > > > > @Ace
> > > > > > > I have been limiting concurrency so as to not blast the cluster
> > > > > > >
> > > > > > > @Nicholas
> > > > > > > Thank you for the noise suggestion I will definitely implement
> > > > > > > that if I> continue with the same methodology
> > > > > > >
> > > > > > > @Taylor
> > > > > > > Are you using a SubDagOperator? Or is your process similar to
> the
> > > > > > > pseudo code I wrote above?
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > > > > <te...@gmail.com> wrote:>
> > > > > > >> We also use a similar approach to generate dynamic DAGs based
> on
> > > > > > >> a common>> template DAG file.  We pull in the list of config
> > > > objects,
> > > > > > one
> > > > > > >> per DAG,>> from an internal API lightly wrapping the database,
> > > then
> > > > we
> > > > > > >> cache that>> response in a Airflow Variable that gets updated
> > > once a
> > > > > > minute.  The>> dynamic DAGs are generated from that variable.
> > > > > > >>
> > > > > > >> *Taylor Edmiston*
> > > > > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > > > > >> <http://blog.tedmiston.com>
> > > > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> > > > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > > > >> <https://angel.co/taylor>
> > > > > > >>
> > > > > > >>
> > > > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > > > > > >> nicolas.kijak@weightwatchers.com> wrote:
> > > > > > >>
> > > > > > >>> Kyle,
> > > > > > >>>
> > > > > > >>> We have a similar approach but on a much, much smaller scale.
> > We
> > > > > > >>> now have>>> <100 “things to process” but expect it to grow to
> > > under
> > > > > > ~200.  Each>> “thing
> > > > > > >>> to process” has the same workflow so we have a single DAG
> > > > > > >>> definition that>>> does about 20 tasks per, then we loop over
> > the
> > > > > list
> > > > > > of items and
> > > > > > >>> produce>> a
> > > > > > >>> dag object for each one adding it to the global definition.
> > > > > > >>>
> > > > > > >>> One of the things we quickly ran into was crushing the
> > scheduler
> > > > > as>>>
> > > > > > everything was running with the same start time.  To get around
> > > > > > >>> this we>> add
> > > > > > >>> noise to the start time minute and seconds. Simply index %
> 60.
> > > > > > This>>> spreads out the load so that the scheduler isn’t trying
> to
> > > run
> > > > > > >>> everything>>> at the exact same moment.  I would suggest if
> you
> > > do
> > > > go
> > > > > > this
> > > > > > >>> route, to>> also
> > > > > > >>> stagger your hours if you can because of how many you plan to
> > > run.
> > > > > > >> Perhaps
> > > > > > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > > > > > >>>
> > > > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com>
> > wrote:
> > > > > > >>>
> > > > > > >>>   Hello,
> > > > > > >>>
> > > > > > >>>   I'm currently using Airflow for some ETL tasks where I
> > submit a
> > > > > > >>>   spark>>> job
> > > > > > >>>   to a cluster and poll till it is complete. This workflow is
> > > nice
> > > > > > >>> because it
> > > > > > >>>   is typically a single Dag. I'm now starting to do more
> > machine
> > > > > > >> learning
> > > > > > >>>   tasks and need to build a model per client which is 1000+
> > > > > > >>>   clients. My>>>   spark cluster is capable of handling this
> > > > > workload,
> > > > > > however, it
> > > > > > >> doesn't
> > > > > > >>>   seem scalable to write 1000+ dags to fit models for each
> > > client.
> > > > > I>>
> > > > > > want
> > > > > > >>>   each client to have its own task instance so it can be
> > retried
> > > > > > >>>   if it>>>   fails without having to run all 1000+ tasks over
> > > > again.
> > > > > > How do I
> > > > > > >> handle
> > > > > > >>>   this type of workflow in Airflow?
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Submitting 1000+ tasks to airflow programatically

Posted by Taylor Edmiston <te...@gmail.com>.
I'm interested in hearing further discussion too, and if others have tried
something similar to our approach.  Several companies on this list have
mentioned various approaches to dynamic DAGs, and I think everyone needs
them eventually.  Maybe it's an opportunity for additional docs regarding
use cases like this and to document best practices from the community.

*Taylor Edmiston*
TEdmiston.com <https://www.tedmiston.com/> | Blog
<http://blog.tedmiston.com>
Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor>


On Thu, Mar 22, 2018 at 12:43 PM, Kyle Hamlin <ha...@gmail.com> wrote:

> @Chris @Taylor
> Thank you guy very much for your explanations! Your strategy makes a lot of
> sense to me. Generating a dag for each client I'm going to have a ton of
> dags on the front page but at least that is searchable haha. I'm going to
> give this implementation a shot and I'll try to report back with the
> outcome.
>
> Can anyone comment on future work to support data science workflows like
> these, or is Airflow fundamentally the wrong tool?
>
> On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <te...@gmail.com>
> wrote:
>
> > We're not using SubDagOperator.  Our approach is using 1 DAG file to
> > generate a separate DAG class instance for each similar config, which
> gets
> > hoisted into global namespace.  In simplified pseudo-Python, it looks
> like:
> >
> > # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
> > cache = Variable.get('sources', default_var={}, deserialize_json=True)
> > sources = fetch_configs() if is_empty(cache) or is_expired(cache) else
> > cache['configs']
> > for source in sources:
> >   dag = DAG(...)
> >   globals()[source._id] = dag
> >   # ...create tasks and set dependencies for each DAG (some config pulled
> > from source object for each)...
> >
> > We added the cache part for the same reason you pointed out, because the
> > DAG processing loop was hitting the API a lot.  Btw, you can also turn
> down
> > how much the processing loop runs with scheduler_heartbeat_sec under the
> > scheduler group in config.
> >
> > We also considered the route Chris mentioned of updating cache via a
> > separate DAG but weren't crazy about having a DAG scheduled once per
> > minute.
> >
> > *Taylor Edmiston*
> > TEdmiston.com <https://www.tedmiston.com/> | Blog
> > <http://blog.tedmiston.com>
> > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor>
> >
> >
> > On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <dc...@gmail.com>
> wrote:
> >
> > > For us we compile down to Python rather than do the logic in Python,
> that
> > > makes it so the load doesn't do real work.
> > >
> > > We have our own DSL that is just a simplified compiler; parse, analyze,
> > > optimize, code gen.  In code gen we just generate the Python code.  Our
> > > build then packages it up and have airflow fetch it (very hacky fetch
> > right
> > > now)
> > >
> > > This does make it so loading is simple and fast, but means you can't
> use
> > > the Python api directly
> > >
> > > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <an...@gmail.com>
> > > wrote:
> > >
> > > > I've had similar issues with large dags being slow to render on ui
> and
> > > > crashing chrome.
> > > >
> > > > I got around it by changing the default tree view from 25 to just 5.
> > > >
> > > > Involves a couple changes to source files though, would be great if
> > some
> > > of
> > > > the ui defaults could go into airflow.cfg.
> > > >
> > > > https://stackoverflow.com/a/48665734/1919374
> > > >
> > > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <cf...@gmail.com> wrote:
> > > >
> > > > > @Kyle, I do something similar and have run into the problems you've
> > > > > mentioned. In my case, I access data from S3 and then generate
> > separate
> > > > > DAGs (of different structures) based on the data that's pulled.
> I've
> > > > > also found that the UI for accessing a single large DAG is slow so
> I
> > > > > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > > > > that's responsible for accessing your API and caching the client
> IDs
> > > > > somewhere locally, maybe just to a file on disk or as an Airflow
> > > > > Variable. You can run this DAG on whatever schedule is appropriate
> > for
> > > > > you. From there, build a function that creates a DAG and then for
> > each
> > > > > client ID, register a DAG built by that function to the global
> > context.
> > > > > Like this:
> > > > > def create_client_dag(client_id):
> > > > >     # build dag here
> > > > >
> > > > > def get_client_ids_locally():
> > > > >     # access the data that was pulled from the API
> > > > >
> > > > > client_ids = get_client_ids_locally()
> > > > > for client in client_ids:
> > > > >     dag = create_client_dag(client)
> > > > >     globals()[dag.dag_id] = dag
> > > > >
> > > > > This approach also handles removing client IDs somewhat gracefully.
> > > DAGs
> > > > > for removed clients will still appear in the UI (you can build a
> > > > > maintenance DAG to clean that up), but they'll be disabled and
> their
> > > > > tasks won't be scheduled.
> > > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > > > Thanks for all the responses let me try to address the main
> themes.
> > > > > >
> > > > > > @Ace @Nicholas @Taylor
> > > > > > I originally started with a loop over my list of client ids and
> > > > > > created a> SparkSubmitOperator for each client. The pseudo code
> > would
> > > > > look
> > > > > > something> like this:
> > > > > >
> > > > > > dag = DAG(...)
> > > > > >
> > > > > > client_ids = get_client_ids()
> > > > > > for client_id in client_ids:
> > > > > >     SparkSubmitOperator(
> > > > > >         ...
> > > > > >         dag=dag
> > > > > >     )
> > > > > >
> > > > > > I found that this approach kind of clunky for a few reasons.
> > > > > > First, the> get_cleint_ids() function was hitting our API every
> > time
> > > > the
> > > > > dag
> > > > > > was read> by the scheduler which seemed excessive (every 30
> seconds
> > > or
> > > > > > so?). Second,> it seemed like when a single task failure made
> > marked
> > > > the
> > > > > whole
> > > > > > dag as a> failure, but I guess retrying till the task worked
> could
> > > > solve
> > > > > > this? Third,> the UI gets really clunky and slow, basically
> > unusable
> > > > > when it
> > > > > > tries to> render the graph view for that many tasks. Finally,
> > Airflow
> > > > > > doesn't seem> very happy when client_ids are removed i.e. the
> > > > > get_client_ids()
> > > > > > no longer> returns a specific client_id, it really seems to want
> a
> > > > > static dag.
> > > > > >
> > > > > > Do I really have to poll and API or database every 30 seconds for
> > > this>
> > > > > dynamic client_id data?
> > > > > >
> > > > > > @Ace
> > > > > > I have been limiting concurrency so as to not blast the cluster
> > > > > >
> > > > > > @Nicholas
> > > > > > Thank you for the noise suggestion I will definitely implement
> > > > > > that if I> continue with the same methodology
> > > > > >
> > > > > > @Taylor
> > > > > > Are you using a SubDagOperator? Or is your process similar to the
> > > > > > pseudo code I wrote above?
> > > > > >
> > > > > >
> > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > > > <te...@gmail.com> wrote:>
> > > > > >> We also use a similar approach to generate dynamic DAGs based on
> > > > > >> a common>> template DAG file.  We pull in the list of config
> > > objects,
> > > > > one
> > > > > >> per DAG,>> from an internal API lightly wrapping the database,
> > then
> > > we
> > > > > >> cache that>> response in a Airflow Variable that gets updated
> > once a
> > > > > minute.  The>> dynamic DAGs are generated from that variable.
> > > > > >>
> > > > > >> *Taylor Edmiston*
> > > > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > > > >> <http://blog.tedmiston.com>
> > > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> > > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > > >> <https://angel.co/taylor>
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > > > > >> nicolas.kijak@weightwatchers.com> wrote:
> > > > > >>
> > > > > >>> Kyle,
> > > > > >>>
> > > > > >>> We have a similar approach but on a much, much smaller scale.
> We
> > > > > >>> now have>>> <100 “things to process” but expect it to grow to
> > under
> > > > > ~200.  Each>> “thing
> > > > > >>> to process” has the same workflow so we have a single DAG
> > > > > >>> definition that>>> does about 20 tasks per, then we loop over
> the
> > > > list
> > > > > of items and
> > > > > >>> produce>> a
> > > > > >>> dag object for each one adding it to the global definition.
> > > > > >>>
> > > > > >>> One of the things we quickly ran into was crushing the
> scheduler
> > > > as>>>
> > > > > everything was running with the same start time.  To get around
> > > > > >>> this we>> add
> > > > > >>> noise to the start time minute and seconds. Simply index % 60.
> > > > > This>>> spreads out the load so that the scheduler isn’t trying to
> > run
> > > > > >>> everything>>> at the exact same moment.  I would suggest if you
> > do
> > > go
> > > > > this
> > > > > >>> route, to>> also
> > > > > >>> stagger your hours if you can because of how many you plan to
> > run.
> > > > > >> Perhaps
> > > > > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > > > > >>>
> > > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com>
> wrote:
> > > > > >>>
> > > > > >>>   Hello,
> > > > > >>>
> > > > > >>>   I'm currently using Airflow for some ETL tasks where I
> submit a
> > > > > >>>   spark>>> job
> > > > > >>>   to a cluster and poll till it is complete. This workflow is
> > nice
> > > > > >>> because it
> > > > > >>>   is typically a single Dag. I'm now starting to do more
> machine
> > > > > >> learning
> > > > > >>>   tasks and need to build a model per client which is 1000+
> > > > > >>>   clients. My>>>   spark cluster is capable of handling this
> > > > workload,
> > > > > however, it
> > > > > >> doesn't
> > > > > >>>   seem scalable to write 1000+ dags to fit models for each
> > client.
> > > > I>>
> > > > > want
> > > > > >>>   each client to have its own task instance so it can be
> retried
> > > > > >>>   if it>>>   fails without having to run all 1000+ tasks over
> > > again.
> > > > > How do I
> > > > > >> handle
> > > > > >>>   this type of workflow in Airflow?
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Submitting 1000+ tasks to airflow programatically

Posted by Kyle Hamlin <ha...@gmail.com>.
@Chris @Taylor
Thank you guy very much for your explanations! Your strategy makes a lot of
sense to me. Generating a dag for each client I'm going to have a ton of
dags on the front page but at least that is searchable haha. I'm going to
give this implementation a shot and I'll try to report back with the
outcome.

Can anyone comment on future work to support data science workflows like
these, or is Airflow fundamentally the wrong tool?

On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <te...@gmail.com>
wrote:

> We're not using SubDagOperator.  Our approach is using 1 DAG file to
> generate a separate DAG class instance for each similar config, which gets
> hoisted into global namespace.  In simplified pseudo-Python, it looks like:
>
> # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
> cache = Variable.get('sources', default_var={}, deserialize_json=True)
> sources = fetch_configs() if is_empty(cache) or is_expired(cache) else
> cache['configs']
> for source in sources:
>   dag = DAG(...)
>   globals()[source._id] = dag
>   # ...create tasks and set dependencies for each DAG (some config pulled
> from source object for each)...
>
> We added the cache part for the same reason you pointed out, because the
> DAG processing loop was hitting the API a lot.  Btw, you can also turn down
> how much the processing loop runs with scheduler_heartbeat_sec under the
> scheduler group in config.
>
> We also considered the route Chris mentioned of updating cache via a
> separate DAG but weren't crazy about having a DAG scheduled once per
> minute.
>
> *Taylor Edmiston*
> TEdmiston.com <https://www.tedmiston.com/> | Blog
> <http://blog.tedmiston.com>
> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor>
>
>
> On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <dc...@gmail.com> wrote:
>
> > For us we compile down to Python rather than do the logic in Python, that
> > makes it so the load doesn't do real work.
> >
> > We have our own DSL that is just a simplified compiler; parse, analyze,
> > optimize, code gen.  In code gen we just generate the Python code.  Our
> > build then packages it up and have airflow fetch it (very hacky fetch
> right
> > now)
> >
> > This does make it so loading is simple and fast, but means you can't use
> > the Python api directly
> >
> > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <an...@gmail.com>
> > wrote:
> >
> > > I've had similar issues with large dags being slow to render on ui and
> > > crashing chrome.
> > >
> > > I got around it by changing the default tree view from 25 to just 5.
> > >
> > > Involves a couple changes to source files though, would be great if
> some
> > of
> > > the ui defaults could go into airflow.cfg.
> > >
> > > https://stackoverflow.com/a/48665734/1919374
> > >
> > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <cf...@gmail.com> wrote:
> > >
> > > > @Kyle, I do something similar and have run into the problems you've
> > > > mentioned. In my case, I access data from S3 and then generate
> separate
> > > > DAGs (of different structures) based on the data that's pulled. I've
> > > > also found that the UI for accessing a single large DAG is slow so I
> > > > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > > > that's responsible for accessing your API and caching the client IDs
> > > > somewhere locally, maybe just to a file on disk or as an Airflow
> > > > Variable. You can run this DAG on whatever schedule is appropriate
> for
> > > > you. From there, build a function that creates a DAG and then for
> each
> > > > client ID, register a DAG built by that function to the global
> context.
> > > > Like this:
> > > > def create_client_dag(client_id):
> > > >     # build dag here
> > > >
> > > > def get_client_ids_locally():
> > > >     # access the data that was pulled from the API
> > > >
> > > > client_ids = get_client_ids_locally()
> > > > for client in client_ids:
> > > >     dag = create_client_dag(client)
> > > >     globals()[dag.dag_id] = dag
> > > >
> > > > This approach also handles removing client IDs somewhat gracefully.
> > DAGs
> > > > for removed clients will still appear in the UI (you can build a
> > > > maintenance DAG to clean that up), but they'll be disabled and their
> > > > tasks won't be scheduled.
> > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > > Thanks for all the responses let me try to address the main themes.
> > > > >
> > > > > @Ace @Nicholas @Taylor
> > > > > I originally started with a loop over my list of client ids and
> > > > > created a> SparkSubmitOperator for each client. The pseudo code
> would
> > > > look
> > > > > something> like this:
> > > > >
> > > > > dag = DAG(...)
> > > > >
> > > > > client_ids = get_client_ids()
> > > > > for client_id in client_ids:
> > > > >     SparkSubmitOperator(
> > > > >         ...
> > > > >         dag=dag
> > > > >     )
> > > > >
> > > > > I found that this approach kind of clunky for a few reasons.
> > > > > First, the> get_cleint_ids() function was hitting our API every
> time
> > > the
> > > > dag
> > > > > was read> by the scheduler which seemed excessive (every 30 seconds
> > or
> > > > > so?). Second,> it seemed like when a single task failure made
> marked
> > > the
> > > > whole
> > > > > dag as a> failure, but I guess retrying till the task worked could
> > > solve
> > > > > this? Third,> the UI gets really clunky and slow, basically
> unusable
> > > > when it
> > > > > tries to> render the graph view for that many tasks. Finally,
> Airflow
> > > > > doesn't seem> very happy when client_ids are removed i.e. the
> > > > get_client_ids()
> > > > > no longer> returns a specific client_id, it really seems to want a
> > > > static dag.
> > > > >
> > > > > Do I really have to poll and API or database every 30 seconds for
> > this>
> > > > dynamic client_id data?
> > > > >
> > > > > @Ace
> > > > > I have been limiting concurrency so as to not blast the cluster
> > > > >
> > > > > @Nicholas
> > > > > Thank you for the noise suggestion I will definitely implement
> > > > > that if I> continue with the same methodology
> > > > >
> > > > > @Taylor
> > > > > Are you using a SubDagOperator? Or is your process similar to the
> > > > > pseudo code I wrote above?
> > > > >
> > > > >
> > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > > <te...@gmail.com> wrote:>
> > > > >> We also use a similar approach to generate dynamic DAGs based on
> > > > >> a common>> template DAG file.  We pull in the list of config
> > objects,
> > > > one
> > > > >> per DAG,>> from an internal API lightly wrapping the database,
> then
> > we
> > > > >> cache that>> response in a Airflow Variable that gets updated
> once a
> > > > minute.  The>> dynamic DAGs are generated from that variable.
> > > > >>
> > > > >> *Taylor Edmiston*
> > > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > > >> <http://blog.tedmiston.com>
> > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > >> <https://angel.co/taylor>
> > > > >>
> > > > >>
> > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > > > >> nicolas.kijak@weightwatchers.com> wrote:
> > > > >>
> > > > >>> Kyle,
> > > > >>>
> > > > >>> We have a similar approach but on a much, much smaller scale. We
> > > > >>> now have>>> <100 “things to process” but expect it to grow to
> under
> > > > ~200.  Each>> “thing
> > > > >>> to process” has the same workflow so we have a single DAG
> > > > >>> definition that>>> does about 20 tasks per, then we loop over the
> > > list
> > > > of items and
> > > > >>> produce>> a
> > > > >>> dag object for each one adding it to the global definition.
> > > > >>>
> > > > >>> One of the things we quickly ran into was crushing the scheduler
> > > as>>>
> > > > everything was running with the same start time.  To get around
> > > > >>> this we>> add
> > > > >>> noise to the start time minute and seconds. Simply index % 60.
> > > > This>>> spreads out the load so that the scheduler isn’t trying to
> run
> > > > >>> everything>>> at the exact same moment.  I would suggest if you
> do
> > go
> > > > this
> > > > >>> route, to>> also
> > > > >>> stagger your hours if you can because of how many you plan to
> run.
> > > > >> Perhaps
> > > > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > > > >>>
> > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com> wrote:
> > > > >>>
> > > > >>>   Hello,
> > > > >>>
> > > > >>>   I'm currently using Airflow for some ETL tasks where I submit a
> > > > >>>   spark>>> job
> > > > >>>   to a cluster and poll till it is complete. This workflow is
> nice
> > > > >>> because it
> > > > >>>   is typically a single Dag. I'm now starting to do more machine
> > > > >> learning
> > > > >>>   tasks and need to build a model per client which is 1000+
> > > > >>>   clients. My>>>   spark cluster is capable of handling this
> > > workload,
> > > > however, it
> > > > >> doesn't
> > > > >>>   seem scalable to write 1000+ dags to fit models for each
> client.
> > > I>>
> > > > want
> > > > >>>   each client to have its own task instance so it can be retried
> > > > >>>   if it>>>   fails without having to run all 1000+ tasks over
> > again.
> > > > How do I
> > > > >> handle
> > > > >>>   this type of workflow in Airflow?
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> >
>

Re: Submitting 1000+ tasks to airflow programatically

Posted by Taylor Edmiston <te...@gmail.com>.
We're not using SubDagOperator.  Our approach is using 1 DAG file to
generate a separate DAG class instance for each similar config, which gets
hoisted into global namespace.  In simplified pseudo-Python, it looks like:

# sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
cache = Variable.get('sources', default_var={}, deserialize_json=True)
sources = fetch_configs() if is_empty(cache) or is_expired(cache) else
cache['configs']
for source in sources:
  dag = DAG(...)
  globals()[source._id] = dag
  # ...create tasks and set dependencies for each DAG (some config pulled
from source object for each)...

We added the cache part for the same reason you pointed out, because the
DAG processing loop was hitting the API a lot.  Btw, you can also turn down
how much the processing loop runs with scheduler_heartbeat_sec under the
scheduler group in config.

We also considered the route Chris mentioned of updating cache via a
separate DAG but weren't crazy about having a DAG scheduled once per minute.

*Taylor Edmiston*
TEdmiston.com <https://www.tedmiston.com/> | Blog
<http://blog.tedmiston.com>
Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor>


On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <dc...@gmail.com> wrote:

> For us we compile down to Python rather than do the logic in Python, that
> makes it so the load doesn't do real work.
>
> We have our own DSL that is just a simplified compiler; parse, analyze,
> optimize, code gen.  In code gen we just generate the Python code.  Our
> build then packages it up and have airflow fetch it (very hacky fetch right
> now)
>
> This does make it so loading is simple and fast, but means you can't use
> the Python api directly
>
> On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <an...@gmail.com>
> wrote:
>
> > I've had similar issues with large dags being slow to render on ui and
> > crashing chrome.
> >
> > I got around it by changing the default tree view from 25 to just 5.
> >
> > Involves a couple changes to source files though, would be great if some
> of
> > the ui defaults could go into airflow.cfg.
> >
> > https://stackoverflow.com/a/48665734/1919374
> >
> > On Thu, 22 Mar 2018, 01:26 Chris Fei, <cf...@gmail.com> wrote:
> >
> > > @Kyle, I do something similar and have run into the problems you've
> > > mentioned. In my case, I access data from S3 and then generate separate
> > > DAGs (of different structures) based on the data that's pulled. I've
> > > also found that the UI for accessing a single large DAG is slow so I
> > > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > > that's responsible for accessing your API and caching the client IDs
> > > somewhere locally, maybe just to a file on disk or as an Airflow
> > > Variable. You can run this DAG on whatever schedule is appropriate for
> > > you. From there, build a function that creates a DAG and then for each
> > > client ID, register a DAG built by that function to the global context.
> > > Like this:
> > > def create_client_dag(client_id):
> > >     # build dag here
> > >
> > > def get_client_ids_locally():
> > >     # access the data that was pulled from the API
> > >
> > > client_ids = get_client_ids_locally()
> > > for client in client_ids:
> > >     dag = create_client_dag(client)
> > >     globals()[dag.dag_id] = dag
> > >
> > > This approach also handles removing client IDs somewhat gracefully.
> DAGs
> > > for removed clients will still appear in the UI (you can build a
> > > maintenance DAG to clean that up), but they'll be disabled and their
> > > tasks won't be scheduled.
> > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > Thanks for all the responses let me try to address the main themes.
> > > >
> > > > @Ace @Nicholas @Taylor
> > > > I originally started with a loop over my list of client ids and
> > > > created a> SparkSubmitOperator for each client. The pseudo code would
> > > look
> > > > something> like this:
> > > >
> > > > dag = DAG(...)
> > > >
> > > > client_ids = get_client_ids()
> > > > for client_id in client_ids:
> > > >     SparkSubmitOperator(
> > > >         ...
> > > >         dag=dag
> > > >     )
> > > >
> > > > I found that this approach kind of clunky for a few reasons.
> > > > First, the> get_cleint_ids() function was hitting our API every time
> > the
> > > dag
> > > > was read> by the scheduler which seemed excessive (every 30 seconds
> or
> > > > so?). Second,> it seemed like when a single task failure made marked
> > the
> > > whole
> > > > dag as a> failure, but I guess retrying till the task worked could
> > solve
> > > > this? Third,> the UI gets really clunky and slow, basically unusable
> > > when it
> > > > tries to> render the graph view for that many tasks. Finally, Airflow
> > > > doesn't seem> very happy when client_ids are removed i.e. the
> > > get_client_ids()
> > > > no longer> returns a specific client_id, it really seems to want a
> > > static dag.
> > > >
> > > > Do I really have to poll and API or database every 30 seconds for
> this>
> > > dynamic client_id data?
> > > >
> > > > @Ace
> > > > I have been limiting concurrency so as to not blast the cluster
> > > >
> > > > @Nicholas
> > > > Thank you for the noise suggestion I will definitely implement
> > > > that if I> continue with the same methodology
> > > >
> > > > @Taylor
> > > > Are you using a SubDagOperator? Or is your process similar to the
> > > > pseudo code I wrote above?
> > > >
> > > >
> > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > <te...@gmail.com> wrote:>
> > > >> We also use a similar approach to generate dynamic DAGs based on
> > > >> a common>> template DAG file.  We pull in the list of config
> objects,
> > > one
> > > >> per DAG,>> from an internal API lightly wrapping the database, then
> we
> > > >> cache that>> response in a Airflow Variable that gets updated once a
> > > minute.  The>> dynamic DAGs are generated from that variable.
> > > >>
> > > >> *Taylor Edmiston*
> > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > >> <http://blog.tedmiston.com>
> > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > >> <https://angel.co/taylor>
> > > >>
> > > >>
> > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > > >> nicolas.kijak@weightwatchers.com> wrote:
> > > >>
> > > >>> Kyle,
> > > >>>
> > > >>> We have a similar approach but on a much, much smaller scale. We
> > > >>> now have>>> <100 “things to process” but expect it to grow to under
> > > ~200.  Each>> “thing
> > > >>> to process” has the same workflow so we have a single DAG
> > > >>> definition that>>> does about 20 tasks per, then we loop over the
> > list
> > > of items and
> > > >>> produce>> a
> > > >>> dag object for each one adding it to the global definition.
> > > >>>
> > > >>> One of the things we quickly ran into was crushing the scheduler
> > as>>>
> > > everything was running with the same start time.  To get around
> > > >>> this we>> add
> > > >>> noise to the start time minute and seconds. Simply index % 60.
> > > This>>> spreads out the load so that the scheduler isn’t trying to run
> > > >>> everything>>> at the exact same moment.  I would suggest if you do
> go
> > > this
> > > >>> route, to>> also
> > > >>> stagger your hours if you can because of how many you plan to run.
> > > >> Perhaps
> > > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > > >>>
> > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com> wrote:
> > > >>>
> > > >>>   Hello,
> > > >>>
> > > >>>   I'm currently using Airflow for some ETL tasks where I submit a
> > > >>>   spark>>> job
> > > >>>   to a cluster and poll till it is complete. This workflow is nice
> > > >>> because it
> > > >>>   is typically a single Dag. I'm now starting to do more machine
> > > >> learning
> > > >>>   tasks and need to build a model per client which is 1000+
> > > >>>   clients. My>>>   spark cluster is capable of handling this
> > workload,
> > > however, it
> > > >> doesn't
> > > >>>   seem scalable to write 1000+ dags to fit models for each client.
> > I>>
> > > want
> > > >>>   each client to have its own task instance so it can be retried
> > > >>>   if it>>>   fails without having to run all 1000+ tasks over
> again.
> > > How do I
> > > >> handle
> > > >>>   this type of workflow in Airflow?
> > > >>>
> > > >>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Re: Submitting 1000+ tasks to airflow programatically

Posted by David Capwell <dc...@gmail.com>.
For us we compile down to Python rather than do the logic in Python, that
makes it so the load doesn't do real work.

We have our own DSL that is just a simplified compiler; parse, analyze,
optimize, code gen.  In code gen we just generate the Python code.  Our
build then packages it up and have airflow fetch it (very hacky fetch right
now)

This does make it so loading is simple and fast, but means you can't use
the Python api directly

On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <an...@gmail.com> wrote:

> I've had similar issues with large dags being slow to render on ui and
> crashing chrome.
>
> I got around it by changing the default tree view from 25 to just 5.
>
> Involves a couple changes to source files though, would be great if some of
> the ui defaults could go into airflow.cfg.
>
> https://stackoverflow.com/a/48665734/1919374
>
> On Thu, 22 Mar 2018, 01:26 Chris Fei, <cf...@gmail.com> wrote:
>
> > @Kyle, I do something similar and have run into the problems you've
> > mentioned. In my case, I access data from S3 and then generate separate
> > DAGs (of different structures) based on the data that's pulled. I've
> > also found that the UI for accessing a single large DAG is slow so I
> > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > that's responsible for accessing your API and caching the client IDs
> > somewhere locally, maybe just to a file on disk or as an Airflow
> > Variable. You can run this DAG on whatever schedule is appropriate for
> > you. From there, build a function that creates a DAG and then for each
> > client ID, register a DAG built by that function to the global context.
> > Like this:
> > def create_client_dag(client_id):
> >     # build dag here
> >
> > def get_client_ids_locally():
> >     # access the data that was pulled from the API
> >
> > client_ids = get_client_ids_locally()
> > for client in client_ids:
> >     dag = create_client_dag(client)
> >     globals()[dag.dag_id] = dag
> >
> > This approach also handles removing client IDs somewhat gracefully. DAGs
> > for removed clients will still appear in the UI (you can build a
> > maintenance DAG to clean that up), but they'll be disabled and their
> > tasks won't be scheduled.
> > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > Thanks for all the responses let me try to address the main themes.
> > >
> > > @Ace @Nicholas @Taylor
> > > I originally started with a loop over my list of client ids and
> > > created a> SparkSubmitOperator for each client. The pseudo code would
> > look
> > > something> like this:
> > >
> > > dag = DAG(...)
> > >
> > > client_ids = get_client_ids()
> > > for client_id in client_ids:
> > >     SparkSubmitOperator(
> > >         ...
> > >         dag=dag
> > >     )
> > >
> > > I found that this approach kind of clunky for a few reasons.
> > > First, the> get_cleint_ids() function was hitting our API every time
> the
> > dag
> > > was read> by the scheduler which seemed excessive (every 30 seconds or
> > > so?). Second,> it seemed like when a single task failure made marked
> the
> > whole
> > > dag as a> failure, but I guess retrying till the task worked could
> solve
> > > this? Third,> the UI gets really clunky and slow, basically unusable
> > when it
> > > tries to> render the graph view for that many tasks. Finally, Airflow
> > > doesn't seem> very happy when client_ids are removed i.e. the
> > get_client_ids()
> > > no longer> returns a specific client_id, it really seems to want a
> > static dag.
> > >
> > > Do I really have to poll and API or database every 30 seconds for this>
> > dynamic client_id data?
> > >
> > > @Ace
> > > I have been limiting concurrency so as to not blast the cluster
> > >
> > > @Nicholas
> > > Thank you for the noise suggestion I will definitely implement
> > > that if I> continue with the same methodology
> > >
> > > @Taylor
> > > Are you using a SubDagOperator? Or is your process similar to the
> > > pseudo code I wrote above?
> > >
> > >
> > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > <te...@gmail.com> wrote:>
> > >> We also use a similar approach to generate dynamic DAGs based on
> > >> a common>> template DAG file.  We pull in the list of config objects,
> > one
> > >> per DAG,>> from an internal API lightly wrapping the database, then we
> > >> cache that>> response in a Airflow Variable that gets updated once a
> > minute.  The>> dynamic DAGs are generated from that variable.
> > >>
> > >> *Taylor Edmiston*
> > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > >> <http://blog.tedmiston.com>
> > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > >> <https://angel.co/taylor>
> > >>
> > >>
> > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > >> nicolas.kijak@weightwatchers.com> wrote:
> > >>
> > >>> Kyle,
> > >>>
> > >>> We have a similar approach but on a much, much smaller scale. We
> > >>> now have>>> <100 “things to process” but expect it to grow to under
> > ~200.  Each>> “thing
> > >>> to process” has the same workflow so we have a single DAG
> > >>> definition that>>> does about 20 tasks per, then we loop over the
> list
> > of items and
> > >>> produce>> a
> > >>> dag object for each one adding it to the global definition.
> > >>>
> > >>> One of the things we quickly ran into was crushing the scheduler
> as>>>
> > everything was running with the same start time.  To get around
> > >>> this we>> add
> > >>> noise to the start time minute and seconds. Simply index % 60.
> > This>>> spreads out the load so that the scheduler isn’t trying to run
> > >>> everything>>> at the exact same moment.  I would suggest if you do go
> > this
> > >>> route, to>> also
> > >>> stagger your hours if you can because of how many you plan to run.
> > >> Perhaps
> > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > >>>
> > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com> wrote:
> > >>>
> > >>>   Hello,
> > >>>
> > >>>   I'm currently using Airflow for some ETL tasks where I submit a
> > >>>   spark>>> job
> > >>>   to a cluster and poll till it is complete. This workflow is nice
> > >>> because it
> > >>>   is typically a single Dag. I'm now starting to do more machine
> > >> learning
> > >>>   tasks and need to build a model per client which is 1000+
> > >>>   clients. My>>>   spark cluster is capable of handling this
> workload,
> > however, it
> > >> doesn't
> > >>>   seem scalable to write 1000+ dags to fit models for each client.
> I>>
> > want
> > >>>   each client to have its own task instance so it can be retried
> > >>>   if it>>>   fails without having to run all 1000+ tasks over again.
> > How do I
> > >> handle
> > >>>   this type of workflow in Airflow?
> > >>>
> > >>>
> > >>>
> > >>
> >
> >
>

Re: Submitting 1000+ tasks to airflow programatically

Posted by Andrew Maguire <an...@gmail.com>.
I've had similar issues with large dags being slow to render on ui and
crashing chrome.

I got around it by changing the default tree view from 25 to just 5.

Involves a couple changes to source files though, would be great if some of
the ui defaults could go into airflow.cfg.

https://stackoverflow.com/a/48665734/1919374

On Thu, 22 Mar 2018, 01:26 Chris Fei, <cf...@gmail.com> wrote:

> @Kyle, I do something similar and have run into the problems you've
> mentioned. In my case, I access data from S3 and then generate separate
> DAGs (of different structures) based on the data that's pulled. I've
> also found that the UI for accessing a single large DAG is slow so I
> prefer to keep many separate DAGs. What I'd try is to define a DAG
> that's responsible for accessing your API and caching the client IDs
> somewhere locally, maybe just to a file on disk or as an Airflow
> Variable. You can run this DAG on whatever schedule is appropriate for
> you. From there, build a function that creates a DAG and then for each
> client ID, register a DAG built by that function to the global context.
> Like this:
> def create_client_dag(client_id):
>     # build dag here
>
> def get_client_ids_locally():
>     # access the data that was pulled from the API
>
> client_ids = get_client_ids_locally()
> for client in client_ids:
>     dag = create_client_dag(client)
>     globals()[dag.dag_id] = dag
>
> This approach also handles removing client IDs somewhat gracefully. DAGs
> for removed clients will still appear in the UI (you can build a
> maintenance DAG to clean that up), but they'll be disabled and their
> tasks won't be scheduled.
> On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > Thanks for all the responses let me try to address the main themes.
> >
> > @Ace @Nicholas @Taylor
> > I originally started with a loop over my list of client ids and
> > created a> SparkSubmitOperator for each client. The pseudo code would
> look
> > something> like this:
> >
> > dag = DAG(...)
> >
> > client_ids = get_client_ids()
> > for client_id in client_ids:
> >     SparkSubmitOperator(
> >         ...
> >         dag=dag
> >     )
> >
> > I found that this approach kind of clunky for a few reasons.
> > First, the> get_cleint_ids() function was hitting our API every time the
> dag
> > was read> by the scheduler which seemed excessive (every 30 seconds or
> > so?). Second,> it seemed like when a single task failure made marked the
> whole
> > dag as a> failure, but I guess retrying till the task worked could solve
> > this? Third,> the UI gets really clunky and slow, basically unusable
> when it
> > tries to> render the graph view for that many tasks. Finally, Airflow
> > doesn't seem> very happy when client_ids are removed i.e. the
> get_client_ids()
> > no longer> returns a specific client_id, it really seems to want a
> static dag.
> >
> > Do I really have to poll and API or database every 30 seconds for this>
> dynamic client_id data?
> >
> > @Ace
> > I have been limiting concurrency so as to not blast the cluster
> >
> > @Nicholas
> > Thank you for the noise suggestion I will definitely implement
> > that if I> continue with the same methodology
> >
> > @Taylor
> > Are you using a SubDagOperator? Or is your process similar to the
> > pseudo code I wrote above?
> >
> >
> > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > <te...@gmail.com> wrote:>
> >> We also use a similar approach to generate dynamic DAGs based on
> >> a common>> template DAG file.  We pull in the list of config objects,
> one
> >> per DAG,>> from an internal API lightly wrapping the database, then we
> >> cache that>> response in a Airflow Variable that gets updated once a
> minute.  The>> dynamic DAGs are generated from that variable.
> >>
> >> *Taylor Edmiston*
> >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> >> <http://blog.tedmiston.com>
> >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> >> <https://angel.co/taylor>
> >>
> >>
> >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> >> nicolas.kijak@weightwatchers.com> wrote:
> >>
> >>> Kyle,
> >>>
> >>> We have a similar approach but on a much, much smaller scale. We
> >>> now have>>> <100 “things to process” but expect it to grow to under
> ~200.  Each>> “thing
> >>> to process” has the same workflow so we have a single DAG
> >>> definition that>>> does about 20 tasks per, then we loop over the list
> of items and
> >>> produce>> a
> >>> dag object for each one adding it to the global definition.
> >>>
> >>> One of the things we quickly ran into was crushing the scheduler as>>>
> everything was running with the same start time.  To get around
> >>> this we>> add
> >>> noise to the start time minute and seconds. Simply index % 60.
> This>>> spreads out the load so that the scheduler isn’t trying to run
> >>> everything>>> at the exact same moment.  I would suggest if you do go
> this
> >>> route, to>> also
> >>> stagger your hours if you can because of how many you plan to run.
> >> Perhaps
> >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> >>>
> >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com> wrote:
> >>>
> >>>   Hello,
> >>>
> >>>   I'm currently using Airflow for some ETL tasks where I submit a
> >>>   spark>>> job
> >>>   to a cluster and poll till it is complete. This workflow is nice
> >>> because it
> >>>   is typically a single Dag. I'm now starting to do more machine
> >> learning
> >>>   tasks and need to build a model per client which is 1000+
> >>>   clients. My>>>   spark cluster is capable of handling this workload,
> however, it
> >> doesn't
> >>>   seem scalable to write 1000+ dags to fit models for each client. I>>
> want
> >>>   each client to have its own task instance so it can be retried
> >>>   if it>>>   fails without having to run all 1000+ tasks over again.
> How do I
> >> handle
> >>>   this type of workflow in Airflow?
> >>>
> >>>
> >>>
> >>
>
>

Re: Submitting 1000+ tasks to airflow programatically

Posted by Chris Fei <cf...@gmail.com>.
@Kyle, I do something similar and have run into the problems you've
mentioned. In my case, I access data from S3 and then generate separate
DAGs (of different structures) based on the data that's pulled. I've
also found that the UI for accessing a single large DAG is slow so I
prefer to keep many separate DAGs. What I'd try is to define a DAG
that's responsible for accessing your API and caching the client IDs
somewhere locally, maybe just to a file on disk or as an Airflow
Variable. You can run this DAG on whatever schedule is appropriate for
you. From there, build a function that creates a DAG and then for each
client ID, register a DAG built by that function to the global context.
Like this:
def create_client_dag(client_id):
    # build dag here

def get_client_ids_locally():
    # access the data that was pulled from the API

client_ids = get_client_ids_locally()
for client in client_ids:
    dag = create_client_dag(client)
    globals()[dag.dag_id] = dag

This approach also handles removing client IDs somewhat gracefully. DAGs
for removed clients will still appear in the UI (you can build a
maintenance DAG to clean that up), but they'll be disabled and their
tasks won't be scheduled.
On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> Thanks for all the responses let me try to address the main themes.
> 
> @Ace @Nicholas @Taylor
> I originally started with a loop over my list of client ids and
> created a> SparkSubmitOperator for each client. The pseudo code would look
> something> like this:
> 
> dag = DAG(...)
> 
> client_ids = get_client_ids()
> for client_id in client_ids:
>     SparkSubmitOperator(
>         ...
>         dag=dag
>     )
> 
> I found that this approach kind of clunky for a few reasons.
> First, the> get_cleint_ids() function was hitting our API every time the dag
> was read> by the scheduler which seemed excessive (every 30 seconds or
> so?). Second,> it seemed like when a single task failure made marked the whole
> dag as a> failure, but I guess retrying till the task worked could solve
> this? Third,> the UI gets really clunky and slow, basically unusable when it
> tries to> render the graph view for that many tasks. Finally, Airflow
> doesn't seem> very happy when client_ids are removed i.e. the get_client_ids()
> no longer> returns a specific client_id, it really seems to want a static dag.
> 
> Do I really have to poll and API or database every 30 seconds for this> dynamic client_id data?
> 
> @Ace
> I have been limiting concurrency so as to not blast the cluster
> 
> @Nicholas
> Thank you for the noise suggestion I will definitely implement
> that if I> continue with the same methodology
> 
> @Taylor
> Are you using a SubDagOperator? Or is your process similar to the
> pseudo code I wrote above?
> 
> 
> On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> <te...@gmail.com> wrote:> 
>> We also use a similar approach to generate dynamic DAGs based on
>> a common>> template DAG file.  We pull in the list of config objects, one
>> per DAG,>> from an internal API lightly wrapping the database, then we
>> cache that>> response in a Airflow Variable that gets updated once a minute.  The>> dynamic DAGs are generated from that variable.
>> 
>> *Taylor Edmiston*
>> TEdmiston.com <https://www.tedmiston.com/> | Blog
>> <http://blog.tedmiston.com>
>> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
>> <https://angel.co/taylor>
>> 
>> 
>> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
>> nicolas.kijak@weightwatchers.com> wrote:
>> 
>>> Kyle,
>>> 
>>> We have a similar approach but on a much, much smaller scale. We
>>> now have>>> <100 “things to process” but expect it to grow to under ~200.  Each>> “thing
>>> to process” has the same workflow so we have a single DAG
>>> definition that>>> does about 20 tasks per, then we loop over the list of items and
>>> produce>> a
>>> dag object for each one adding it to the global definition.
>>> 
>>> One of the things we quickly ran into was crushing the scheduler as>>> everything was running with the same start time.  To get around
>>> this we>> add
>>> noise to the start time minute and seconds. Simply index % 60.  This>>> spreads out the load so that the scheduler isn’t trying to run
>>> everything>>> at the exact same moment.  I would suggest if you do go this
>>> route, to>> also
>>> stagger your hours if you can because of how many you plan to run.
>> Perhaps
>>> your DAGs are smaller and aren’t as CPU intensive as ours.
>>> 
>>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com> wrote:
>>> 
>>>   Hello,
>>> 
>>>   I'm currently using Airflow for some ETL tasks where I submit a
>>>   spark>>> job
>>>   to a cluster and poll till it is complete. This workflow is nice
>>> because it
>>>   is typically a single Dag. I'm now starting to do more machine
>> learning
>>>   tasks and need to build a model per client which is 1000+
>>>   clients. My>>>   spark cluster is capable of handling this workload, however, it
>> doesn't
>>>   seem scalable to write 1000+ dags to fit models for each client. I>> want
>>>   each client to have its own task instance so it can be retried
>>>   if it>>>   fails without having to run all 1000+ tasks over again. How do I
>> handle
>>>   this type of workflow in Airflow?
>>> 
>>> 
>>> 
>> 


Re: Submitting 1000+ tasks to airflow programatically

Posted by Kyle Hamlin <ha...@gmail.com>.
Thanks for all the responses let me try to address the main themes.

@Ace @Nicholas @Taylor
I originally started with a loop over my list of client ids and created a
SparkSubmitOperator for each client. The pseudo code would look something
like this:

dag = DAG(...)

client_ids = get_client_ids()
for client_id in client_ids:
    SparkSubmitOperator(
        ...
        dag=dag
    )

I found that this approach kind of clunky for a few reasons. First, the
get_cleint_ids() function was hitting our API every time the dag was read
by the scheduler which seemed excessive (every 30 seconds or so?). Second,
it seemed like when a single task failure made marked the whole dag as a
failure, but I guess retrying till the task worked could solve this? Third,
the UI gets really clunky and slow, basically unusable when it tries to
render the graph view for that many tasks. Finally, Airflow doesn't seem
very happy when client_ids are removed i.e. the get_client_ids() no longer
returns a specific client_id, it really seems to want a static dag.

Do I really have to poll and API or database every 30 seconds for this
dynamic client_id data?

@Ace
I have been limiting concurrency so as to not blast the cluster

@Nicholas
Thank you for the noise suggestion I will definitely implement that if I
continue with the same methodology

@Taylor
Are you using a SubDagOperator? Or is your process similar to the
pseudo code I wrote above?


On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston <te...@gmail.com> wrote:

> We also use a similar approach to generate dynamic DAGs based on a common
> template DAG file.  We pull in the list of config objects, one per DAG,
> from an internal API lightly wrapping the database, then we cache that
> response in a Airflow Variable that gets updated once a minute.  The
> dynamic DAGs are generated from that variable.
>
> *Taylor Edmiston*
> TEdmiston.com <https://www.tedmiston.com/> | Blog
> <http://blog.tedmiston.com>
> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor>
>
>
> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> nicolas.kijak@weightwatchers.com> wrote:
>
> > Kyle,
> >
> > We have a similar approach but on a much, much smaller scale. We now have
> > <100 “things to process” but expect it to grow to under ~200.  Each
> “thing
> > to process” has the same workflow so we have a single DAG definition that
> > does about 20 tasks per, then we loop over the list of items and produce
> a
> > dag object for each one adding it to the global definition.
> >
> > One of the things we quickly ran into was crushing the scheduler as
> > everything was running with the same start time.  To get around this we
> add
> > noise to the start time minute and seconds. Simply index % 60.  This
> > spreads out the load so that the scheduler isn’t trying to run everything
> > at the exact same moment.  I would suggest if you do go this route, to
> also
> > stagger your hours if you can because of how many you plan to run.
> Perhaps
> > your DAGs are smaller and aren’t as CPU intensive as ours.
> >
> > On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com> wrote:
> >
> >     Hello,
> >
> >     I'm currently using Airflow for some ETL tasks where I submit a spark
> > job
> >     to a cluster and poll till it is complete. This workflow is nice
> > because it
> >     is typically a single Dag. I'm now starting to do more machine
> learning
> >     tasks and need to build a model per client which is 1000+ clients. My
> >     spark cluster is capable of handling this workload, however, it
> doesn't
> >     seem scalable to write 1000+ dags to fit models for each client. I
> want
> >     each client to have its own task instance so it can be retried if it
> >     fails without having to run all 1000+ tasks over again. How do I
> handle
> >     this type of workflow in Airflow?
> >
> >
> >
>

Re: Submitting 1000+ tasks to airflow programatically

Posted by Taylor Edmiston <te...@gmail.com>.
We also use a similar approach to generate dynamic DAGs based on a common
template DAG file.  We pull in the list of config objects, one per DAG,
from an internal API lightly wrapping the database, then we cache that
response in a Airflow Variable that gets updated once a minute.  The
dynamic DAGs are generated from that variable.

*Taylor Edmiston*
TEdmiston.com <https://www.tedmiston.com/> | Blog
<http://blog.tedmiston.com>
Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor>


On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
nicolas.kijak@weightwatchers.com> wrote:

> Kyle,
>
> We have a similar approach but on a much, much smaller scale. We now have
> <100 “things to process” but expect it to grow to under ~200.  Each “thing
> to process” has the same workflow so we have a single DAG definition that
> does about 20 tasks per, then we loop over the list of items and produce a
> dag object for each one adding it to the global definition.
>
> One of the things we quickly ran into was crushing the scheduler as
> everything was running with the same start time.  To get around this we add
> noise to the start time minute and seconds. Simply index % 60.  This
> spreads out the load so that the scheduler isn’t trying to run everything
> at the exact same moment.  I would suggest if you do go this route, to also
> stagger your hours if you can because of how many you plan to run.  Perhaps
> your DAGs are smaller and aren’t as CPU intensive as ours.
>
> On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com> wrote:
>
>     Hello,
>
>     I'm currently using Airflow for some ETL tasks where I submit a spark
> job
>     to a cluster and poll till it is complete. This workflow is nice
> because it
>     is typically a single Dag. I'm now starting to do more machine learning
>     tasks and need to build a model per client which is 1000+ clients. My
>     spark cluster is capable of handling this workload, however, it doesn't
>     seem scalable to write 1000+ dags to fit models for each client. I want
>     each client to have its own task instance so it can be retried if it
>     fails without having to run all 1000+ tasks over again. How do I handle
>     this type of workflow in Airflow?
>
>
>

Re: Submitting 1000+ tasks to airflow programatically

Posted by Nicolas Kijak <ni...@weightwatchers.com>.
Kyle,

We have a similar approach but on a much, much smaller scale. We now have <100 “things to process” but expect it to grow to under ~200.  Each “thing to process” has the same workflow so we have a single DAG definition that does about 20 tasks per, then we loop over the list of items and produce a dag object for each one adding it to the global definition.

One of the things we quickly ran into was crushing the scheduler as everything was running with the same start time.  To get around this we add noise to the start time minute and seconds. Simply index % 60.  This spreads out the load so that the scheduler isn’t trying to run everything at the exact same moment.  I would suggest if you do go this route, to also stagger your hours if you can because of how many you plan to run.  Perhaps your DAGs are smaller and aren’t as CPU intensive as ours.

On 3/21/18, 1:35 PM, "Kyle Hamlin" <ha...@gmail.com> wrote:

    Hello,
    
    I'm currently using Airflow for some ETL tasks where I submit a spark job
    to a cluster and poll till it is complete. This workflow is nice because it
    is typically a single Dag. I'm now starting to do more machine learning
    tasks and need to build a model per client which is 1000+ clients. My
    spark cluster is capable of handling this workload, however, it doesn't
    seem scalable to write 1000+ dags to fit models for each client. I want
    each client to have its own task instance so it can be retried if it
    fails without having to run all 1000+ tasks over again. How do I handle
    this type of workflow in Airflow?