You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (Jira)" <ji...@apache.org> on 2020/12/12 14:06:00 UTC

[jira] [Commented] (AIRFLOW-6920) AIRFLOW Feature Parity with LUIGI & CONTROLM

    [ https://issues.apache.org/jira/browse/AIRFLOW-6920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17248358#comment-17248358 ] 

Ash Berlin-Taylor commented on AIRFLOW-6920:
--------------------------------------------

What executor are you using?

I suggest trying _at least_ the LocalExecutor. The default is sequential which is _SLOW_

> AIRFLOW Feature Parity with LUIGI & CONTROLM 
> ---------------------------------------------
>
>                 Key: AIRFLOW-6920
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6920
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: tests
>    Affects Versions: 1.10.7
>            Reporter: t oo
>            Priority: Major
>
> *LUIGI* vs *AIRFLOW*
>  
> 200 sequential tasks (so no parallelism):
>  
> +LUIGI:+
>  mkdir -p test_output8
>  pip install luigi
>  #no need to start web server, scheduler or meta db
>  #*8.3secs* total time for all 200
>  time python3 -m luigi --module cloop --local-scheduler ManyMany
>  
> +AIRFLOW v2.0.0rc1 with:+
>  #*513 sec* total time for all 200, .1-.2s per task but 1-2sec gap between tasks
>  #intention was for tasks in the DAG to be completely sequential ie task 3 must wait for task 2 which must wait for task 1..etc but chain() not working as intended?? so used default_pool=1
>  airflow initdb
>  nohup airflow webserver -p 8080 &
>  nohup airflow scheduler &
>  airflow dags trigger looper3
>  #look at dagrun start-endtime
>  
> +AIRFLOW v1.10.7:+
>  #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
>  #intention was for tasks in the DAG to be completely sequential ie task 3 must wait for task 2 which must wait for task 1..etc but chain() not working as intended?? so used default_pool=1
>  airflow initdb
>  nohup airflow webserver -p 8080 &
>  nohup airflow scheduler &
>  airflow trigger_dag looper2
>  #look at dagrun start-endtime
>  
> cloop.py
> {code:java}
> import os
> #import time
> import luigi
> # To run:
> # cd ~/luigi_workflows
> # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany --workers=100
> class Sleep(luigi.Task):
>     #resources = {'foo': 10}
>     fname = luigi.Parameter()
>     def requires(self):
>         #print(self)
>         zin=self.fname
>         ii=int(zin.split('_')[1])
>         if ii > 1:
>             return Sleep(fname='marker_{}'.format(ii-1))
>         else:
>             []
>     def full_path(self):
>         return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_output8', self.fname)
>     def run(self):
>         #time.sleep(1)
>         with open(self.full_path(), 'w') as f:
>             print('', file=f)
>     def output(self):
>         return luigi.LocalTarget(self.full_path())
> class Many(luigi.WrapperTask):
>     n = luigi.IntParameter()
>     def requires(self):
>         for i in range(self.n):
>             yield Sleep(fname='marker_{}'.format(i))
> class ManyMany(luigi.WrapperTask):
>     n = luigi.IntParameter(default=200)
>     def requires(self):
>         for i in range(self.n):
>             yield Many(n=self.n)
> {code}
> looper3.py
> {code:java}
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.utils.helpers import chain
> args = {
>     'owner': 'airflow',
>     'retries': 300,
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='looper3', default_args=args,
>     schedule_interval=None)
> def print_context(ds, **kwargs):
>     print(1)
> chain([PythonOperator(python_callable=print_context,task_id='op' + str(i), dag=dag,pool='default_pool') for i in range(1, 201)])
> if __name__ == "__main__":
>     dag.cli()
> {code}
> looper2.py
> {code:java}
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.helpers import chain
> args = {
>     'owner': 'airflow',
>     'retries': 3,
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='looper2', default_args=args,
>     schedule_interval=None)
> chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])
> if __name__ == "__main__":
>     dag.cli()
> {code}
> I saw similar test in 
>  [https://github.com/apache/airflow/pull/5096] but it did not seem to be sequential or using scheduler
> Possible test scenarios:
>  1. 1 DAG with 200 tasks running sequentially
>  2. 1 DAG with 200 tasks running all in parallel (200 slots)
>  3. 1 DAG with 200 tasks running all in parallel (48 slots)
>  4. 200 DAGs each with 1 task
>  Then repeat above changing 200 to 2000 or 20.etc
> Qs: 
>  1. any plans for an 'in-memory' scheduler like Luigi's? 
>  2. Anyone open to a Luigi Operator? 
>  3. Any speedups to make existing scheduler faster? Noting that the tasks here are sequential
> ControlM comparison:
>  is it envisioned that airflow becomes a replacement for [https://www.bmcsoftware.uk/it-solutions/control-m.html] ?
>  execution_date seems similar to Order Date, DAG seems similar to job, tasks in a dag seem similar to a command called by a job but some of the items I see missing:
>  1. integrating public holiday calendars,
>  2. ability to specify schedule like 11am on '2nd weekday of the month', 'last 5 days of the month', 'last business day of the month'
>  3. ability to visualise dependencies between dags (there does not seem to be a high level way to say at 11am schedule DAGc after DAGa and DAGb, then at 3pm schedule DAGd after DAGc only if DAGc was successful )
>  4. ability to click 1 to many dags in a UI and change their state to killed/success (force ok).etc and have it instantly affect task instances (ie stopping them)
>  5. ability to set whole DAGs to 'dummy' on certain days of the week. ie DAGb (runs 7 days a week and do stuff) must run after DAGa for each execdate (DAGa should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie entire dag is 'dummy' just to satisfy 'IN condition' of DAGb)
>  6. ability to change the number of tasks within a DAG for a diff exec date without 'stuffing' up the scheduler/metadb
>  7. ability to 'order up' any day in the past/future (for all or some dags) and keep it on 'hold', visualise which dags 'would' be scheduled, see dag dependencies, and choose to run all/some (or just do nothing and delete them) of the DAGs while maintaining dependencies between them and optionally 'forcing ok' some to skip dependencies.
>  8. ability to feed in conf (ie arguments) to a DAG from a UI or change the host the dag runs on
>  9. ability to rerun an entire 'exec date' and maintain audit trail in the db of timings of the 1st run of that exec date, plus allow different conf on 2nd run.
>  10. faster execution,
>  a) it seems if I want 15 different dag ids of 300 tasks each and all should run exact same tasks (just with different conf arguments) the dagbag has to parse 4500 tasks instead of recognising a single set of 300 differed only by conf
>  b) 'push' flow of tasks within a dag, rather than gaps between tasks
>  c) scheduler does not get overloaded with 100k tasks 
>  11. dagrun timeout (without maxruns constraint)
>  12. enforce depends on prior exec date of a dag with schedules that may only be weekly, certain days a week
>  13. multi pools (ie quantitative resources) on a single dag
>  14. ability to edit schedules via the UI
>  15. audit trail of changes to a DAG (not tasks but things like schedule, runas user)
> At the moment:
>  ControlM=Enterprise features, stability, speed but no python definitions of tasks
>  Luigi=Speed and python definitions of tasks but no scheduling
>  Airflow=Community momentum and python definitions of tasks but not fast and lacking some features of ControlM



--
This message was sent by Atlassian Jira
(v8.3.4#803005)