You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Dmitriy Krasnikov <dk...@hotmail.com> on 2016/12/15 21:43:22 UTC

Can I create dag from inside other dag.

I created the job that allows me to implement triggering of other jobs (needed something until REST API comes alone, maybe even later). It’s an interesting hack of dynamic tasks created on DAG load, and they are TriggerDagRunOperators. In a nutshell they pull database on dag load and create as many TriggerDagRun tasks as needed based on entry (dag name and json data to push). It works, with some scheduling quirks (of course), but the issue I am having, the dag load and scheduled run (when dag run tasks executed) are not in synch. So I cannot mark entries in use, otherwise the load closest to run will not find entries. I tried another approach of generating 2nd dag with these tasks during scheduled execution of first dag, but this dag doesn’t get loaded. Why? I know the narrative is quite confusing so here are code files.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from turbine_plugin.meta.turbine_trigger import Trigger
from collections import defaultdict


def get_triggers_by_dag():
    trigger = Trigger()
    all_triggers = trigger.get_all_dags()
    triggers_by_dag = defaultdict(list)
    for t in all_triggers:
        triggers_by_dag[t.get('dag')].append(t)
    return triggers_by_dag


def set_trigger_back(context):
    trigger = Trigger()
    trigger.update(trigger_id=context['params']['id'], status=0)


def delete_trigger(context):
    trigger = Trigger()
    trigger.delete(trigger_id=context['params']['id'])


def useless_callable(context, dag_run_obj):
    """This function is implemented because developers like shims, I guess."""
    dag_run_obj.payload = {'value': context['params']['value']}
    return dag_run_obj


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 12, 1),
    'retries': 0,
    'retry_delay': timedelta(seconds=15)
}

dag = DAG('trigger_dag',
          schedule_interval=timedelta(minutes=1),
          default_args=default_args)

# trying everything to make it work
start = DummyOperator(task_id='dummy', owner='airflow', dag=dag)

triggers = get_triggers_by_dag()
for dag_id in triggers.keys():
    last_task = None
    for i, t in enumerate(triggers[dag_id]):
        task = TriggerDagRunOperator(
            task_id='{dag}_{seq}_task'.format(dag=dag_id, seq=str(i)),
            trigger_dag_id=dag_id,
            python_callable=useless_callable,
            params=t,
            dag=dag
        )
        task.on_failure_callback = set_trigger_back
        task.on_success_callback = delete_trigger
        task.set_upstream(last_task if last_task else start)
        last_task = task


And the second one

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from turbine_plugin.meta.turbine_trigger import Trigger
from collections import defaultdict


def get_triggers_by_dag():
    trigger = Trigger()
    all_triggers = trigger.get_all_dags()
    triggers_by_dag = defaultdict(list)
    for t in all_triggers:
        triggers_by_dag[t.get('dag')].append(t)
    return triggers_by_dag


def set_trigger_back(context):
    trigger = Trigger()
    trigger.update(trigger_id=context['params']['id'], status=0)


def delete_trigger(context):
    trigger = Trigger()
    trigger.delete(trigger_id=context['params']['id'])


def useless_callable(context, dag_run_obj):
    """This function is implemented because developers like shims, I guess."""
    dag_run_obj.payload = {'value': context['params']['value']}
    return dag_run_obj


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 12, 15, 21, 0),
    'retries': 0,
    'retry_delay': timedelta(seconds=5)
}


def generate_new_dag(ds, **kwargs):
    triggers = get_triggers_by_dag()
    if triggers:
        dag_new = DAG('semaphor_{time}'.format(time=datetime.now().strftime('%Y%m%d%H%M')),
                      schedule_interval='@once',
                      default_args=default_args)
        for dag_id in triggers.keys():
            last_task = None
            for i, t in enumerate(triggers[dag_id]):
                task = TriggerDagRunOperator(
                    task_id='{dag}_{seq}_task'.format(dag=dag_id, seq=str(i)),
                    trigger_dag_id=dag_id,
                    python_callable=useless_callable,
                    params=t,
                    dag=dag_new
                )
                task.on_failure_callback = set_trigger_back
                task.on_success_callback = delete_trigger
                if last_task:
                    task.set_upstream(last_task)
                last_task = task

dag = DAG('dag_generator',
          schedule_interval=timedelta(minutes=2),
          default_args=default_args)

start = PythonOperator(task_id='dag_generator',
                       python_callable=generate_new_dag,
                       owner='airflow',
                       provide_context=True,
                       params={},
                       dag=dag)