You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "pmercatoris (via GitHub)" <gi...@apache.org> on 2023/02/02 17:34:29 UTC

[GitHub] [airflow] pmercatoris opened a new issue, #29339: Failing to execute successive Dynamic Task Group Mapping

pmercatoris opened a new issue, #29339:
URL: https://github.com/apache/airflow/issues/29339

   ### Apache Airflow version
   
   2.5.1
   
   ### What happened
   
   I am trying to map a task group within another task group which is already mapped. However, when launching the dag, all tasks of the containing task group successfully finish. 
   However, the following occurs:  
   
   - The contained task group is not mapped and has a state of  `upstream failed`. 
   ![image](https://user-images.githubusercontent.com/8655813/216397932-b07c623b-f3ca-45c5-8a52-6bd7911fe538.png)
   ![image](https://user-images.githubusercontent.com/8655813/216398586-6f8ebb2b-6b99-4df5-898e-1356c66f73be.png)
   
   - The graph UI doesn't show after launching the dag as in https://github.com/apache/airflow/issues/29287 
   ![image](https://user-images.githubusercontent.com/8655813/216398113-767abd6a-07e6-499b-8f59-4316f9edd695.png)
   
   
   ### What you think should happen instead
   
   I would expect the print_group and print task to start as soon as 1 of the load task finishes
   
   ### How to reproduce
   
   I am currently using the docker-compose of the version 2.5.1
   `FROM apache/airflow:2.5.1-python3.10`
   
   ```python
   import pendulum
   from airflow.decorators import dag, task, task_group
   
   @task
   def get_symbols():
       res = [('A', 1, 111), ('B', 2, 222)]
       return res
   
   
   @task
   def print(symbol_info, data_interval_end=None):
       # Do some work...
       print(symbol_info)
       return symbol_info
   
   @task_group()
   def print_group(symbol):
       return print(symbol_info=symbol)
   
   
   @task
   def extract(symbol_info, data_interval_end=None):
       # Do some work...
       return symbol_info
   @task
   def transform(symbol_info, data_interval_end=None):
       # Do some work...
       return symbol_info
   
   @task
   def load(symbol_info, data_interval_end=None):
       # Do some work...
       return 2*[symbol_info]
   
   @task_group
   def etl_tg(symbol):
       raw_symbols_data = extract(symbol_info=symbol)
       clean_symbols_data = transform(symbol_info=raw_symbols_data)
       loaded_symbols = load(symbol_info=clean_symbols_data)
       return print_group.expand(symbol=loaded_symbols)
   
   @dag(
       dag_id=f"task_group_mapping_2",
       tags=["sandbox"],
       schedule=None,
       start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
       catchup=False,
       max_active_runs=1,
   )
   def etl_dag():
   
       # DAG
       symbols = get_symbols()
       etl_tg.expand(symbol=symbols)
   
   etl_dag() 
   ```
   
   ### Operating System
   
   Ubuntu 20.04.5 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on issue #29339: Failing to execute successive Dynamic Task Group Mapping

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29339:
URL: https://github.com/apache/airflow/issues/29339#issuecomment-1414704413

   Operator and task group expansion in an expanded task group is not supported yet. We raise [NotImplementedError](https://github.com/apache/airflow/blob/main/airflow/models/mappedoperator.py#L309-L309) exception for the operators but I am not sure if it is raised in your case.
    
   To implement the mapped tasks, we have added a new column `map_index` which represent the id of each mapped task, and we set it to `-1` for the normal tasks. Supporting the nested expansion is not possible with the same logic, because we need to add a new column for each level, and the number of possible levels is infinite in this case (nested expansion in nested expansion ...), so I don't think we will support it soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed issue #29339: Failing to execute successive Dynamic Task Group Mapping

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed issue #29339: Failing to execute successive Dynamic Task Group Mapping
URL: https://github.com/apache/airflow/issues/29339


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org