You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/15 10:00:32 UTC

[GitHub] [airflow] KennethanCeyer opened a new issue #18265: Asyncio support for task, Operators

KennethanCeyer opened a new issue #18265:
URL: https://github.com/apache/airflow/issues/18265


   ### Description
   
   ## Purpose
   
   A lot of data processing logic runs in Airflow. In data processing, the proportion of logic for IO-bound processing is also high, and asyncio can be a good alternative to optimize it.
   
   The problem is, Current airflow version is not supported async keyword for @task decorated function or Operator. We need to define the `event_loop` and need to call a control methods such as `loop.run_until_complete`.
   
   This process requires a function to wrap once more to use a library that provides an `async` function, and configuring such a function complicates the DAG and task. Therefore, I would like to propose a design that allows the `async` keyword for DAGs, tasks, and operators.
   
   ## Design
   
   **TO-BE**
   
   ```python
   import asyncio
   import datetime
   from typing import List
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   @task
   async def get_nums() -> List[int]:
       await asyncio.sleep(5)
       return list(range(5))
   
   
   @task
   def print_nums(nums: List[int]) -> None:
       print(nums)
   
   
   async with DAG(
       "example_asyncio",
       schedule_interval="@daily",
       start_date=datetime.datetime.now() - datetime.timedelta(days=1),
   ) as dag:
       nums = await get_nums()
       print_nums(nums)
   ```
   
   **Current version**
   
   ```python
   import asyncio
   import datetime
   from typing import List
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   @task
   async def get_nums() -> List[int]:
       loop = asyncio.get_event_loop()
       return loop.run_until_complete(get_nums_async())
   
   @task
   async def get_nums_async() -> List[int]:
       await asyncio.sleep(5)
       return list(range(5))
   
   
   @task
   def print_nums(nums: List[int]) -> None:
       print(nums)
   
   
   with DAG(
       "example_asyncio",
       schedule_interval="@daily",
       start_date=datetime.datetime.now() - datetime.timedelta(days=1),
   ) as dag:
       nums = get_nums()
       print_nums(nums)
   ```
   
   ### Use case/motivation
   
   Below code won't work
   
   ```python
   import asyncio
   import datetime
   from typing import List
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   @task
   async def get_nums() -> List[int]:
       await asyncio.sleep(5)
       return list(range(5))
   
   
   @task
   def print_nums(nums: List[int]) -> None:
       print(nums)
   
   
   async with DAG(
       "example_asyncio",
       schedule_interval="@daily",
       start_date=datetime.datetime.now() - datetime.timedelta(days=1),
   ) as dag:
       nums = await get_nums()
       print_nums(nums)
   
   ```
   
   **OUTPUT**
   
   ```plaintext
   Broken DAG: [/opt/airflow/dags/repo/dags/example_asyncio.py] Traceback (most recent call last):
     File "<frozen importlib._bootstrap_external>", line 911, in source_to_code
     File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
     File "/opt/airflow/dags/repo/dags/example_asyncio.py", line 20
       async with DAG(
       ^
   SyntaxError: 'async with' outside async function
   ```
   
   ---
   
   We need to wrap this code with `loop.run_util_complete` in current Airflow version.
   
   ```python
   import asyncio
   import datetime
   from typing import List
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   @task
   async def get_nums() -> List[int]: # We need this wrapping function.
       loop = asyncio.get_event_loop()
       return loop.run_until_complete(get_nums_async())
   
   @task
   async def get_nums_async() -> List[int]:
       await asyncio.sleep(5)
       return list(range(5))
   
   
   @task
   def print_nums(nums: List[int]) -> None:
       print(nums)
   
   
   with DAG(
       "example_asyncio",
       schedule_interval="@daily",
       start_date=datetime.datetime.now() - datetime.timedelta(days=1),
   ) as dag:
       nums = get_nums()
       print_nums(nums)
   
   ```
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #18265: Asyncio support for task, Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #18265:
URL: https://github.com/apache/airflow/issues/18265#issuecomment-920173475


   Closing for now - please take a look and see if the Aip-40 solves your problem and come back (and maybe open another issue on how it can be improved if you find it insufficient).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] KennethanCeyer commented on issue #18265: Asyncio support for task, Operators

Posted by GitBox <gi...@apache.org>.
KennethanCeyer commented on issue #18265:
URL: https://github.com/apache/airflow/issues/18265#issuecomment-921684323


   @potiuk 
   Thanks for your comments. I read the AIP 40 you mentioned in advance.
   but I couldn't read it properly.
   so will read a little more closely, and test out the beta features which you mentioned.
   
   And If the enhancement still seems necessary, I will create another issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #18265: Asyncio support for task, Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #18265:
URL: https://github.com/apache/airflow/issues/18265#issuecomment-920172564


   Asyncio based operators are one of the big features in the upcoming Airflow 2.2. They are called Deferrable Operators and they were implemented as part of AIP-40   
   
   https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177050929
   
   They are available in Airflow 2.2.0 beta 1 (you can subscribe to devlist to get announcements) and you can take it for a spin even now. Please double check it - documentation is available in 'main' branch of airflow. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed issue #18265: Asyncio support for task, Operators

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #18265:
URL: https://github.com/apache/airflow/issues/18265


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org