You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/16 16:49:28 UTC

[GitHub] [airflow] quoc-t-le opened a new issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

quoc-t-le opened a new issue #15407:
URL: https://github.com/apache/airflow/issues/15407


   **Description**
   
   I am trying to convert a SubDag to a TaskGroup which uses the ShortCircuitOperator.  Originally, the short circuit will trip and move on to next SubDag.  I cannot get the same behavior with TaskGroup.
   
   What do you want to happen?
   
   If used ShortCircuitOperator within a TaskGroup, I would expect it to just short circuit the downstreams within the taskgroup.
   
   Here is the code I am trying to do.  I would expect T1 to be short circuited and execute T2:
   
   `from airflow import DAG
   from datetime import datetime
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.python_operator import ShortCircuitOperator, PythonOperator
   from airflow.utils.task_group import TaskGroup
   
   default_args = {
       'owner': 'airflow',
       'retries': 3,
       'depends_on_past': False,
   }
   
   def short_circuit (*arg, **kwargs):
       return False
   
   with DAG ("short-circuit",
             catchup=True,
             default_args=default_args,
             schedule_interval='@daily',
             description='Aggregates and pulls down data for API endpoints that use analytics',
             start_date=datetime.strptime('04/14/2021', '%m/%d/%Y'),
             max_active_runs=1
   ) as dag:
       t0 = DummyOperator(task_id='start')
       with TaskGroup('t1') as t1:
           s1 = ShortCircuitOperator(
               task_id='short_circuit',
               python_callable=short_circuit
           )
           s2 = DummyOperator(task_id='t1s2')
           s3 = DummyOperator(task_id='t1s3')
           s4 = DummyOperator(task_id='t1s4')
           s5 = DummyOperator(task_id='t1s5')
           s6 = DummyOperator(task_id='t1s6')
           s7 = DummyOperator(task_id='t1s7')
           s8 = DummyOperator(task_id='t1s8')
           s9 = DummyOperator(task_id='t1s9')
           s1 >> s2 >> s3 >> s4
           s4 >> s5
           s5 >> s6 >> s7
           s7 >> s8 >> s9
   
   
       with TaskGroup('t2') as t2:
           s1 = ShortCircuitOperator(
                       task_id='short_circuit',
                       python_callable=short_circuit,
                       op_kwargs={"category": "mobile"}
                   )
           s2 = DummyOperator(task_id='t1s2')
           s3 = DummyOperator(task_id='t1s3')
           s4 = DummyOperator(task_id='t1s4')
           s5 = DummyOperator(task_id='t1s5')
           s6 = DummyOperator(task_id='t1s6')
           s7 = DummyOperator(task_id='t1s7')
           s8 = DummyOperator(task_id='t1s8')
           s9 = DummyOperator(task_id='t1s9')
           s1 >> s2 >> s3 >> s4
           s4 >> s5
           s5 >> s6 >> s7
           s7 >> s8 >> s9
   
       t0 >> t1 >> t2
   `
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821334673


   `ShortCircuitOperator` cascades through everything by design https://github.com/apache/airflow/issues/7858 
   Since TaskGroup is more of a UI feature this is expected.
   However since you are using TaskGroup as a replacement to SubDag I can see how this functionality is missing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] quoc-t-le edited a comment on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
quoc-t-le edited a comment on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821836053


   I was able to achieve the goal with BranchPythonOperator.  Would need an additional DummyOperator at the end of the taskgroup with trigger_rule='all_done' and the BranchPythonOperator either jump to next or skip all until that.  little hacky


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] quoc-t-le removed a comment on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
quoc-t-le removed a comment on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821836053


   I was able to achieve the what I wanted to do with BranchPythonOperator.  Would need an additional DummyOperator at the end of the taskgroup with trigger_rule='all_done' and the BranchPythonOperator either jump to next or skip all until the DummyOperator.  little hacky


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] quoc-t-le removed a comment on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
quoc-t-le removed a comment on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821535724


   yeah at the moment I cant continue to use the subdag because of prev_execution_date not correctly passed down to subdag or something with Airflow 2: [https://github.com/apache/airflow/issues/15396](url)
   
   I tried BranchPythonOperator but doesnt work as expected.  In theory, this should branch to the skip and execute t2...
   
   `from airflow import DAG
   from datetime import datetime
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.python_operator import ShortCircuitOperator, PythonOperator, BranchPythonOperator
   from airflow.utils.task_group import TaskGroup
   
   default_args = {
       'owner': 'airflow',
       'retries': 3,
       'depends_on_past': False,
   }
   
   def branch_op (*arg, **kwargs):
       category = kwargs.get('category')
       if (category=='t1'):
           return 't1.skip'
   
       if (category=='t2'):
           return 't2.skip'
   
   with DAG ("short-circuit",
             catchup=True,
             default_args=default_args,
             schedule_interval='@daily',
             description='Aggregates and pulls down data for API endpoints that use analytics',
             start_date=datetime.strptime('04/14/2021', '%m/%d/%Y'),
             max_active_runs=1
   ) as dag:
       t0 = DummyOperator(task_id='start')
       with TaskGroup('t1') as t1:
           s1 = BranchPythonOperator(
               task_id='short_circuit',
               python_callable=branch_op,
               provide_context=True,
               op_kwargs={"category": "t1"}
           )
           s2 = DummyOperator(task_id='t1s2')
           s3 = DummyOperator(task_id='t1s3')
           s4 = DummyOperator(task_id='t1s4')
           s5 = DummyOperator(task_id='t1s5')
           s6 = DummyOperator(task_id='t1s6')
           s7 = DummyOperator(task_id='t1s7')
           s8 = DummyOperator(task_id='t1s8')
           s9 = DummyOperator(task_id='t1s9')
           s10 = DummyOperator(task_id='skip')
           s1 >> s2 >> s3 >> s4
           s4 >> s5
           s5 >> s6 >> s7
           s7 >> s8 >> s9 >> s10
   
   
       with TaskGroup('t2') as t2:
           s1 = BranchPythonOperator(
                       task_id='short_circuit',
                       python_callable=branch_op,
                       provide_context=True,
                       op_kwargs={"category": "t2"}
                   )
           s2 = DummyOperator(task_id='t1s2')
           s3 = DummyOperator(task_id='t1s3')
           s4 = DummyOperator(task_id='t1s4')
           s5 = DummyOperator(task_id='t1s5')
           s6 = DummyOperator(task_id='t1s6')
           s7 = DummyOperator(task_id='t1s7')
           s8 = DummyOperator(task_id='t1s8')
           s9 = DummyOperator(task_id='t1s9')
           s10 = DummyOperator(task_id='skip')
           s1 >> s2 >> s3 >> s4
           s4 >> s5
           s5 >> s6 >> s7
           s7 >> s8 >> s9 >> s10
       t0 >> t1 >> t2`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] quoc-t-le commented on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
quoc-t-le commented on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821838866


   BranchPythonOperator works fine for this; just need to make sure the jump has the trigger_rule='all_done'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] quoc-t-le closed issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
quoc-t-le closed issue #15407:
URL: https://github.com/apache/airflow/issues/15407


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] quoc-t-le commented on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
quoc-t-le commented on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821535724


   yeah at the moment I cant continue to use the subdag because of prev_execution_date not correctly passed down to subdag or something with Airflow 2: [https://github.com/apache/airflow/issues/15396](url)
   
   I tried BranchPythonOperator but doesnt work as expected.  In theory, this should branch to the skip and execute t2...
   
   `from airflow import DAG
   from datetime import datetime
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.python_operator import ShortCircuitOperator, PythonOperator, BranchPythonOperator
   from airflow.utils.task_group import TaskGroup
   
   default_args = {
       'owner': 'airflow',
       'retries': 3,
       'depends_on_past': False,
   }
   
   def branch_op (*arg, **kwargs):
       category = kwargs.get('category')
       if (category=='t1'):
           return 't1.skip'
   
       if (category=='t2'):
           return 't2.skip'
   
   with DAG ("short-circuit",
             catchup=True,
             default_args=default_args,
             schedule_interval='@daily',
             description='Aggregates and pulls down data for API endpoints that use analytics',
             start_date=datetime.strptime('04/14/2021', '%m/%d/%Y'),
             max_active_runs=1
   ) as dag:
       t0 = DummyOperator(task_id='start')
       with TaskGroup('t1') as t1:
           s1 = BranchPythonOperator(
               task_id='short_circuit',
               python_callable=branch_op,
               provide_context=True,
               op_kwargs={"category": "t1"}
           )
           s2 = DummyOperator(task_id='t1s2')
           s3 = DummyOperator(task_id='t1s3')
           s4 = DummyOperator(task_id='t1s4')
           s5 = DummyOperator(task_id='t1s5')
           s6 = DummyOperator(task_id='t1s6')
           s7 = DummyOperator(task_id='t1s7')
           s8 = DummyOperator(task_id='t1s8')
           s9 = DummyOperator(task_id='t1s9')
           s10 = DummyOperator(task_id='skip')
           s1 >> s2 >> s3 >> s4
           s4 >> s5
           s5 >> s6 >> s7
           s7 >> s8 >> s9 >> s10
   
   
       with TaskGroup('t2') as t2:
           s1 = BranchPythonOperator(
                       task_id='short_circuit',
                       python_callable=branch_op,
                       provide_context=True,
                       op_kwargs={"category": "t2"}
                   )
           s2 = DummyOperator(task_id='t1s2')
           s3 = DummyOperator(task_id='t1s3')
           s4 = DummyOperator(task_id='t1s4')
           s5 = DummyOperator(task_id='t1s5')
           s6 = DummyOperator(task_id='t1s6')
           s7 = DummyOperator(task_id='t1s7')
           s8 = DummyOperator(task_id='t1s8')
           s9 = DummyOperator(task_id='t1s9')
           s10 = DummyOperator(task_id='skip')
           s1 >> s2 >> s3 >> s4
           s4 >> s5
           s5 >> s6 >> s7
           s7 >> s8 >> s9 >> s10
       t0 >> t1 >> t2`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal edited a comment on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
eladkal edited a comment on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821334673


   `ShortCircuitOperator` cascades through everything by design https://github.com/apache/airflow/issues/7858 
   Since TaskGroup is more of a UI feature this is expected.
   However given your use case I can see how this functionality is missing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal edited a comment on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
eladkal edited a comment on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821334673


   `ShortCircuitOperator` cascades through everything by design https://github.com/apache/airflow/issues/7858 
   Since TaskGroup is more of a UI feature this is expected.
   However given your use case I can see how this functionality is missing.
   
   I think this one requires some discussion to define what is the actual change (if any) to address the problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] quoc-t-le commented on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
quoc-t-le commented on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821836053


   I was able to achieve the goal with BranchPythonOperator.  Would need an additional DummyOperator at the end of the taskgruop with trigger_rule='all_done' and the BranchPythonOperator either jump to next or skip all until that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] quoc-t-le edited a comment on issue #15407: ShortCircuitOperator to short circuit downstream of TaskGroup only

Posted by GitBox <gi...@apache.org>.
quoc-t-le edited a comment on issue #15407:
URL: https://github.com/apache/airflow/issues/15407#issuecomment-821836053


   I was able to achieve the what I wanted to do with BranchPythonOperator.  Would need an additional DummyOperator at the end of the taskgroup with trigger_rule='all_done' and the BranchPythonOperator either jump to next or skip all until the DummyOperator.  little hacky


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org