You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/23 07:43:22 UTC

[GitHub] [airflow] EssKayz opened a new issue, #26619: Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

EssKayz opened a new issue, #26619:
URL: https://github.com/apache/airflow/issues/26619

   ### Description
   
   After the new implementation of "Assigning multiple parameters to a non-TaskFlow operator" we tried using this for our PythonOperators, but couldn't get it to work. This is seemingly because a PythonOperator calls the callable function using `op_kwargs`, and as such anything defined in `expand_kwargs` doesn't get routed to the python_callable.
   
   ### Use case/motivation
   
   We use Airflow quite heavily using PythonOperators, where data flows through XCom -> it would be really nice to be able to pass over dictionaries from XCom through op_kwargs using the dynamic task mapping. Currently we are abusing the PythonOperator's "template_dict" for this purpose, by doing:
   ```
   PythonOperator.partial(...).expand(
               templates_dict=XComArg(task)
   )
   ```
   
   This way we can currently get a dictionary from XCom into the python_callable, and then parsing that into keywords after the fact.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] EssKayz commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
EssKayz commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1256048784

   This should be a working example, so essentially 
   ```
   dag = DAG('mockup_example',
             # With this option, even when the start date is "one day ago" we instruct airflow scheduler to only create
             # a DAG Run for the most current instance of the DAG interval series.
             catchup=False,
             default_args=dag_args,
             default_view='graph',
             schedule_interval=None, dagrun_timeout=timedelta(minutes=30))
   
   
   def gen_multiprocess_example(
       dag: DAG,
       gen_task_id,
       **kwargs
   ) -> TaskGroup:
   
       with TaskGroup(dag=dag, group_id=f'mockup_{gen_task_id}', prefix_group_id=False) as grp:
           task = PythonOperator(
               task_id=f'split_{gen_task_id}_data',
               python_callable=split_example,
               op_kwargs={
   
               }
           )
   
           # This works, by utilizing a dirty hack
           PythonOperator.partial(
               task_id=f'process_{gen_task_id}_data',
               python_callable=process_split_data,
               op_kwargs={
   
               }
           ).expand(
               templates_dict=XComArg(task)
           )
   
           # This doesn't work, since the kwargs get expanded to PythonOperator, not to the callable.
           # PythonOperator.partial(
           #     task_id=f'process_{gen_task_id}_data',
           #     python_callable=process_split_data,
           #     op_kwargs={
   
           #     }
           # ).expand_kwargs(
           #     XComArg(task)
           # ) 
   
       return grp
   
   
   def split_example(**kwargs):
       return [{'data': '1234', 'bucket_name': 'prod'}, {'data': '4321', 'bucket_name': 'test'}]
   
   
   def process_split_data(data=None, bucket_name=None, templates_dict={}, **kwargs):
       print('-' * 20)
       print('data in process_func:', data)
       print('bucket_name in process_func:', bucket_name)
       print('templates_dict in process_func:', templates_dict)
       print('-' * 20)
       print(get_data(**templates_dict, **kwargs))
   
   
   def get_data(data=None, bucket_name=None, **kwargs):
       print('-' * 20)
       print('data in get_data:', data)
       print('bucket_name in get_data:', bucket_name)
       print('-' * 20)
       return data
   
   
   with dag:
       tg1 = gen_multiprocess_example(
           dag,
           gen_task_id='hope_this_helps'
       )
       ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1255894900

   Cna you provide a concrete example? It sounds to me that taskflow is perfect for your use case instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #26619: Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1255892811

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] EssKayz commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
EssKayz commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1256007989

   So essentially what we would _want_ to do is have the `process_split_data` callable get kwargs from the XCom return of the task, and that we have currently jerry-rigged using the templates_dict.
   when we tried using expand_kwargs, the kwargs were simply passed to PythonOperator as kwargs, and we didn't find a way to pass over them to the op_kwargs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] EssKayz commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
EssKayz commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1256004943

   I sadly am not allowed to post direct code due to company policies, but how we deal with that is, we for example have a function  that does roughly the following: 
   ```
   def gen_multiprocessed_task(gen_task_id, processing_func, ...)
   task = PythonOperator(
               task_id=f'split_{gen_task_id}_data',
               python_callable=split_data,
               op_kwargs={ ....  }
   
    PythonOperator.partial(
       task_id=f'....{gen_task_id}...', 
       python_callable=process_split_data, 
       op_kwargs={'processing_func': processing_func, 'aws_config': aws_config, 
       ...
       ).expand(
       templates_dict=XComArg(task)
       )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1257446715

   Would this not work?
   
   ```python
   @task(task_id=f'process_{gen_task_id}_data')
   def process_split_data(data):
       ...
   
   process_split_data.expand(data=XComArg(task))
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] EssKayz commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
EssKayz commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1261836906

   Hmm, will have to check if that would work - but wouldn't it still be a good idea to try to support PythonOperators with expand_kwargs too? Not sure how complex that would be to do of course.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1264812103

   You did not call the task function. I’m going to convert this to a discussion since this is more about the gap in user knowledge instead of Airflow functionality here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1255998491

   But you can’t dynamically provide a task_id to PythonOperator either…? I think I am mising something very important here. Again, a concrete example would be best.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr closed issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
uranusjr closed issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator
URL: https://github.com/apache/airflow/issues/26619


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] EssKayz commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
EssKayz commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1264676637

   Still not exactly sure how to get that to work, unsure if I understood the suggestion wrong, but at least done in a similar way as with PythonOperators, no task is generated to the DAG
   ```
   from airflow.decorators import task
   
   def get_data_from_db(table_name, **kwargs):
   
       @task(task_id=f'get_db_{table_name}')
       def get_db_objects(**kwargs):
           return {}
   
   
   with dag:
       test_task = get_data_from_db(
           table_name='person'
       )
   
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] EssKayz commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
EssKayz commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1256030429

   I can try to quickly mock up a working example, if my incoherent rambling is hard to grasp - sadly not a native English speaker :disappointed: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] EssKayz commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
EssKayz commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1255994705

   We tried taskflow aswell, but with taskflow we ran into the problem that we cannot dynamically give task_id to the generated tasks. We are using Airflow for an ETL migration process, so we have automagic task generation for data exports, transformations, filtering and imports.
   
   Concrete example pseudocode:
   ```
   for each table in database:
       generate following tasks, with table.table_name included in the task_ids (50+ tasks with the same name would get extremely confusing)
           task that exports data as list of json from database, and splits it into 1...n S3 bucket files,
           and then another task that dynamically generated task, that takes one of the split files, and performs a python_function on it. After this - collect all the converted objects from the dynamically generated tasks, and combine them into one S3 bucket file.
           
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #26619: 2.4.0 : Dynamic Task Mapping - Assigning multiple parameters to a PythonOperator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1261848503

   It is supported in concept, the main problem is actually the `op_kwargs` syntax would be pretty complex for a user to _write_ so I’m not bothering to try figuring out how it’d work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org