You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/16 20:59:39 UTC
[GitHub] [airflow] quoc-t-le removed a comment on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only
quoc-t-le removed a comment on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821535724
yeah at the moment I cant continue to use the subdag because of prev_execution_date not correctly passed down to subdag or something with Airflow 2: [https://github.com/apache/airflow/issues/15396](url)
I tried BranchPythonOperator but doesnt work as expected. In theory, this should branch to the skip and execute t2...
`from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator, PythonOperator, BranchPythonOperator
from airflow.utils.task_group import TaskGroup
default_args = {
'owner': 'airflow',
'retries': 3,
'depends_on_past': False,
}
def branch_op (*arg, **kwargs):
category = kwargs.get('category')
if (category=='t1'):
return 't1.skip'
if (category=='t2'):
return 't2.skip'
with DAG ("short-circuit",
catchup=True,
default_args=default_args,
schedule_interval='@daily',
description='Aggregates and pulls down data for API endpoints that use analytics',
start_date=datetime.strptime('04/14/2021', '%m/%d/%Y'),
max_active_runs=1
) as dag:
t0 = DummyOperator(task_id='start')
with TaskGroup('t1') as t1:
s1 = BranchPythonOperator(
task_id='short_circuit',
python_callable=branch_op,
provide_context=True,
op_kwargs={"category": "t1"}
)
s2 = DummyOperator(task_id='t1s2')
s3 = DummyOperator(task_id='t1s3')
s4 = DummyOperator(task_id='t1s4')
s5 = DummyOperator(task_id='t1s5')
s6 = DummyOperator(task_id='t1s6')
s7 = DummyOperator(task_id='t1s7')
s8 = DummyOperator(task_id='t1s8')
s9 = DummyOperator(task_id='t1s9')
s10 = DummyOperator(task_id='skip')
s1 >> s2 >> s3 >> s4
s4 >> s5
s5 >> s6 >> s7
s7 >> s8 >> s9 >> s10
with TaskGroup('t2') as t2:
s1 = BranchPythonOperator(
task_id='short_circuit',
python_callable=branch_op,
provide_context=True,
op_kwargs={"category": "t2"}
)
s2 = DummyOperator(task_id='t1s2')
s3 = DummyOperator(task_id='t1s3')
s4 = DummyOperator(task_id='t1s4')
s5 = DummyOperator(task_id='t1s5')
s6 = DummyOperator(task_id='t1s6')
s7 = DummyOperator(task_id='t1s7')
s8 = DummyOperator(task_id='t1s8')
s9 = DummyOperator(task_id='t1s9')
s10 = DummyOperator(task_id='skip')
s1 >> s2 >> s3 >> s4
s4 >> s5
s5 >> s6 >> s7
s7 >> s8 >> s9 >> s10
t0 >> t1 >> t2`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org