You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/09 06:18:10 UTC

[GitHub] [airflow] VBhojawala commented on pull request #12159: @taskgroup decorator implementation [https://github.com/apache/airflow/issues/11870 ]

VBhojawala commented on pull request #12159:
URL: https://github.com/apache/airflow/pull/12159#issuecomment-723786153


   > Thanks for proposing this. I think it works for small examples. However I have some questions regarding how this should work for larger examples. For instance, if we wanna put two or more tasks in the same `TaskGroup`, or a `TaskGroup` nested in another `TaskGroup`, can that be done with with `@task_group`?
   > 
   > The reason I'm asking is that the contextmanager syntax is quite natural for creating nested structures (which is what `TaskGroup` is). The decorator syntax can probably be tweaked to do something close enough, but it's not as straightforward. If you have a good way of doing that, maybe demonstrate that in the example?
   
   Yes it does works because we have used the same context manager used TaskGroup, i am writing test cases for that also. Here is the example 
   
   
   ``` python
   from airflow.models.dag import DAG
   from airflow.operators.python import task
   from airflow.utils.dates import days_ago
   from airflow.utils.task_group import taskgroup
   
   
   # Creating Tasks
   from airflow.www.views import task_group_to_dict
   
   
   @task()
   def task_start():
       """Dummy Task which is First Task of Dag """
       return '[Task_start]'
   
   
   @task()
   def task_end1():
       """Dummy Task which is Last Task of Dag"""
       print(f'[ Task_End 1 ]')
   
   
   @task()
   def task_end2():
       """Dummy Task which is Last Task of Dag"""
       print(f'[ Task_End 2 ]')
   
   
   @task
   def task_1(value):
       """ Dummy Task1"""
       return f'[ Task1 {value} ]'
   
   
   @task
   def task_2(value):
       """ Dummy Task2"""
       print(f'[ Task2 {value} ]')
   
   
   @task
   def task_3(value):
       """ Dummy Task3"""
       return f'[ Task3 {value} ]'
   
   
   @task
   def task_4(value):
       """ Dummy Task3"""
       print(f'[ Task4 {value} ]')
   
   
   # Creating TaskGroups
   @taskgroup(group_id='section_1')
   def section_1(value):
       """ TaskGroup for grouping related Tasks"""
   
       @taskgroup(group_id='section_2')
       def section_2(value2):
           """Nested TaskGroup for grouping related Tasks"""
           return task_4(task_3(value2))
   
       op1 = task_2(task_1(value))
       return section_2(op1)
   
   
   # Executing Tasks and TaskGroups
   with DAG(dag_id="example_nested_task_group_decorator", start_date=days_ago(2), tags=["example"]) as dag:
       t1 = task_start()
       s1 = section_1(t1)
       s1.set_downstream(task_end1())
   
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org