You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "John Longo (JIRA)" <ji...@apache.org> on 2018/09/20 17:06:00 UTC
[jira] [Created] (AIRFLOW-3097) Capability for nested SubDags
John Longo created AIRFLOW-3097:
-----------------------------------
Summary: Capability for nested SubDags
Key: AIRFLOW-3097
URL: https://issues.apache.org/jira/browse/AIRFLOW-3097
Project: Apache Airflow
Issue Type: New Feature
Components: subdag
Affects Versions: 1.8.0
Reporter: John Longo
Unless I'm doing something incorrectly, it appears that you cannot nest SubDags which would be a very helpful feature. I've created a simple pipeline to demonstrate the failure case below:
test_dag.py
{code:java}
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
import datetime
from datetime import timedelta
from test_subdag1 import TestSubDag1
startDate = '2018-09-20'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['email@airflow.com'],
'start_date': datetime.datetime(2018, 3, 20, 9, 0),
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(seconds=30),
'run_as_user': 'airflow'
}
Test_DAG = DAG('Test_DAG', default_args=default_args, start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None, catchup=False)
test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1', startDate),
task_id='test_subdag1',
dag=Test_DAG)
TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate', dag=Test_DAG)
test_subdag1 >> TestDagConsolidateTask
{code}
test_subdag1.py
{code:java}
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from test_subdag2 import TestSubDag2
import datetime
from datetime import timedelta
def TestSubDag1(parent_dag_name, child_dag_name, startDate):
subdag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=None,
start_date=startDate)
test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name, child_dag_name), 'test_subdag2', startDate),
task_id='test_subdag2',
dag=subdag)
Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate', dag=subdag)
test_subdag2 >> Subdag1ConsolidateTask
{code}
test_subdag2.py
{code:java}
// code placeholder
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
import datetime
from datetime import timedelta
def TestSubDag2(parent_dag_name, child_dag_name, startDate):
subdag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=None,
start_date=startDate)
TestTask = DummyOperator(task_id='TestTask', dag=subdag)
Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate', dag=subdag)
TestTask >> Subdag2ConsolidateTask
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)