You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "t oo (Jira)" <ji...@apache.org> on 2020/02/25 23:16:00 UTC

[jira] [Created] (AIRFLOW-6920) AIRFLOW Feature Parity with LUIGI & CONTROLM

t oo created AIRFLOW-6920:
-----------------------------

             Summary: AIRFLOW Feature Parity with LUIGI & CONTROLM 
                 Key: AIRFLOW-6920
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6920
             Project: Apache Airflow
          Issue Type: Improvement
          Components: tests
    Affects Versions: 1.10.7
            Reporter: t oo


*LUIGI* vs *AIRFLOW*

 

200 sequential tasks (so no parallelism):

 

+LUIGI:+
 mkdir -p test_output8
pip install luigi
#no need to start web server, scheduler or meta db
 #*8.3secs* total time for all 200
 time python3 -m luigi --module cloop --local-scheduler ManyMany

 

+AIRFLOW:+
 #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
 #intention was for tasks in the DAG to be completely sequential ie task 3 must wait for task 2 which must wait for task 1..etc but chain() not working as intended?? so used default_pool=1
airflow initdb
nohup airflow webserver -p 8080 &
nohup airflow scheduler  &
airflow trigger_dag looper2
 #look at dagrun start-endtime

 

cloop.py
{code:java}
import os
#import time

import luigi

# To run:
# cd ~/luigi_workflows
# pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany --workers=100

class Sleep(luigi.Task):
    #resources = {'foo': 10}

    fname = luigi.Parameter()

    def requires(self):
        #print(self)
        zin=self.fname
        ii=int(zin.split('_')[1])
        if ii > 1:
            return Sleep(fname='marker_{}'.format(ii-1))
        else:
            []

    def full_path(self):
        return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_output8', self.fname)

    def run(self):
        #time.sleep(1)
        with open(self.full_path(), 'w') as f:
            print('', file=f)

    def output(self):
        return luigi.LocalTarget(self.full_path())


class Many(luigi.WrapperTask):
    n = luigi.IntParameter()

    def requires(self):
        for i in range(self.n):
            yield Sleep(fname='marker_{}'.format(i))


class ManyMany(luigi.WrapperTask):
    n = luigi.IntParameter(default=200)

    def requires(self):
        for i in range(self.n):
            yield Many(n=self.n)
{code}
looper2.py
{code:java}
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain

args = {
    'owner': 'airflow',
    'retries': 3,
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='looper2', default_args=args,
    schedule_interval=None)

chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])

if __name__ == "__main__":
    dag.cli()
{code}

I saw similar test in 
https://github.com/apache/airflow/pull/5096 but it did not seem to be sequential or using scheduler



Possible test scenarios:
1. 1 DAG with 200 tasks running sequentially
2. 1 DAG with 200 tasks running all in parallel (200 slots)
3. 1 DAG with 200 tasks running all in parallel (48 slots)
4. 200 DAGs each with 1 task
Then repeat above changing 200 to 2000 or 20.etc


Qs: 
1. any plans for an 'in-memory' scheduler like Luigi's? 
2. Anyone open to a Luigi Operator? 
3. Any speedups to make existing scheduler faster? Noting that the tasks here are sequential (should be similar time to 200 dags of 1 task each)


ControlM comparison:
is it envisioned that airflow becomes a replacement for https://www.bmcsoftware.uk/it-solutions/control-m.html ?
 execution_date seems similar to Order Date, DAG seems similar to job, tasks in a dag seem similar to a command called by a job but some of the items I see missing:
    1. integrating public holiday calendars,
  2. ability to specify schedule like 11am on '2nd weekday of the month', 'last 5 days of the month', 'last business day of the month'
  3. ability to visualise dependencies between dags (there does not seem to be a high level way to say at 11am schedule DAGc after DAGa and DAGb, then at 3pm schedule DAGd after DAGc only if DAGc was successful )
  4. ability to click 1 to many dags in a UI and change their state to killed/success (force ok).etc and have it instantly affect task instances (ie stopping them)
  5. ability to set whole DAGs to 'dummy' on certain days of the week. ie DAGb (runs 7 days a week and do stuff) must run after DAGa for each execdate (DAGa should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie entire dag is 'dummy' just to satisfy 'IN condition' of DAGb)
  6. ability to change the number of tasks within a DAG for a diff exec date without 'stuffing' up the scheduler/metadb
  7. ability to 'order up' any day in the past/future (for all or some dags) and keep it on 'hold', visualise which dags 'would' be scheduled, see dag dependencies, and choose to run all/some (or just do nothing and delete them) of the DAGs while maintaining dependencies between them and optionally 'forcing ok' some to skip dependencies.
  8. ability to feed in conf (ie arguments) to a DAG from a UI or change the host the dag runs on
  9. ability to rerun an entire 'exec date' and maintain audit trail in the db of timings of the 1st run of that exec date, plus allow different conf on 2nd run.
  10. faster execution,
  a) it seems if I want 15 different dag ids of 300 tasks each and all should run exact same tasks (just with different conf arguments) the dagbag has to parse 4500 tasks instead of recognising a single set of 300 differed only by conf
  b) 'push' flow of tasks within a dag, rather than gaps between tasks
  c) scheduler does not get overloaded with 100k tasks 
11. dagrun timeout (without maxruns constraint)
  12. enforce depends on prior exec date of a dag with schedules that may only be weekly, certain days a week
  13. multi pools (ie quantitative resources) on a single dag
  14. ability to edit schedules via the UI
    15. audit trail of changes to a DAG (not tasks but things like schedule, runas user)

At the moment:
ControlM=Enterprise features, stability, speed but no python definitions of tasks
Luigi=Speed and python definitions of tasks but no scheduling
Airflow=Community momentum and python definitions of tasks but not fast and lacking some features of ControlM








--
This message was sent by Atlassian Jira
(v8.3.4#803005)