You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "hussein-awala (via GitHub)" <gi...@apache.org> on 2023/12/31 02:25:37 UTC

[PR] [POC][WIP] Async SQLAlchemy sessions in Airflow [airflow]

hussein-awala opened a new pull request, #36504:
URL: https://github.com/apache/airflow/pull/36504

   Airflow Metadata is the brain of Airflow; it is the central component used to store the state of all operations and enable communication between other components.
   
   A lot of the execution time of Airflow components is wasted waiting for some input of this Metadata, which we can avoid if we query the database asynchronously. We can perform other operations during the waiting time or send more requests to Metadata.
   
   SQLAlchemy supports since 1.4 [Asynchronous I/O (asyncio)](https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html) that we can use to improve all our components:
   - improving scheduler logic
   - improving the async execution in the executors
   - making the Triggerer fully asynchronous
   - loading variables/connections asynchronously in the triggers and the async hooks.
   - and, of course, improving the performance of our API and webserver, which will serve much more users with lower resource usage.
   
   To test and validate the efficiency of async sessions before merging complicated methods from sync to async, I implemented the utils to create the async sessions and some async methods to load the variables and the connections from the database, and I tried  one of the newly implemented methods with the triggerer:
   
   test_trigger.py:
   ```python
   from typing import Any
   
   from airflow.triggers.base import BaseTrigger
   
   
   class TestTrigger(BaseTrigger):
   
       def serialize(self) -> tuple[str, dict[str, Any]]:
           return "airflow.test_trigger.TestTrigger", {}
   
       async def run(self):
           import asyncio
   
           from airflow.models.connection import Connection
   
           for _ in range(1000):
               conn = await Connection.async_get_connection_from_secrets("airflow_db")
               self.log.info(conn.login)
               await asyncio.sleep(0.1)
           yield {}
   ```
   dag.py:
   ```python
   from datetime import datetime
   from airflow.models.dag import DAG
   from airflow.models.operator import BaseOperator
   from airflow.test_trigger import TestTrigger
   
   
   class TestOperator(BaseOperator):
       def __init__(self, param: int, **kwargs):
           super().__init__(**kwargs)
           self.param = param
   
       def execute(self, context):
           self.defer(
               trigger=TestTrigger(),
               method_name="execute_complete",
           )
   
       def execute_complete(self):
           self.log.info("Done!")
   
   
   with DAG(
       dag_id="async_db",
       schedule_interval=None,
       start_date=datetime(2023, 12, 31),
       catchup=False,
   ) as dag:
       TestOperator.partial(task_id="test_task").expand(param=list(range(1000)))
   ```
   I triggered a manual Dag Run, which created 1000 mapped tasks and deferred them.
   
   I had the expected result in the log (login root) without any blocking in the event loop of the Triggerer, with reasonable resource consumption.
   
   Then I tested by replacing the trigger run method with:
   ```python
       async def run(self):
           import asyncio
   
           from airflow.models.connection import Connection
   
           for _ in range(1000):
               conn = Connection.get_connection_from_secrets("airflow_db")
               self.log.info(conn.login)
               await asyncio.sleep(0.1)
           yield {}
   ```
   I got the result in the task log, but the Triggerer log was full of:
   ```
   [2023-12-31T01:42:36.934+0000] {triggerer_job_runner.py:598} INFO - trigger async_db/manual__2023-12-31T01:42:07.645143+00:00/test_task/235/1 (ID 471) starting
   [2023-12-31T01:42:37.091+0000] {triggerer_job_runner.py:598} INFO - trigger async_db/manual__2023-12-31T01:42:07.645143+00:00/test_task/236/1 (ID 472) starting
   [2023-12-31T01:42:37.159+0000] {triggerer_job_runner.py:576} INFO - Triggerer's async thread was blocked for 0.57 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.
   [2023-12-31T01:42:37.369+0000] {triggerer_job_runner.py:598} INFO - trigger async_db/manual__2023-12-31T01:42:07.645143+00:00/test_task/237/1 (ID 473) starting
   [2023-12-31T01:42:37.549+0000] {triggerer_job_runner.py:598} INFO - trigger async_db/manual__2023-12-31T01:42:07.645143+00:00/test_task/239/1 (ID 474) starting
   [2023-12-31T01:42:37.772+0000] {triggerer_job_runner.py:576} INFO - Triggerer's async thread was blocked for 0.61 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.
   ```
   and the Triggerer used almost all the resources of my computer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [POC][WIP] Async SQLAlchemy sessions in Airflow [airflow]

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on PR #36504:
URL: https://github.com/apache/airflow/pull/36504#issuecomment-1872653755

   For the drivers:
   - SQLite:
     - https://github.com/omnilib/aiosqlite
   - PostgreSQL:
     - https://magicstack.github.io/asyncpg/current/
     - https://www.psycopg.org/docs/advanced.html#asynchronous-support
   - MySQL:
     - https://github.com/aio-libs/aiomysql
      - https://github.com/long2ice/asyncmy


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [POC][WIP] Async SQLAlchemy sessions in Airflow [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #36504:
URL: https://github.com/apache/airflow/pull/36504#issuecomment-1872889631

   I expect a long time to go/test all cases . But I love it :).
   
   But yeah, if we limit it for Connections and Variables for now, that would be cool. I think one general comment here is that not everyone will be able to use those drivers that support async operations I am afraid. There might be various limitations - dependencies, some specific corporate/managed DB configuration that is not supported - so, almost-for-sure - we should have a way to 
   
   a) handle sync access
   b) fail async access (from triggers) when async access is done and the driver does not support it (?) 
   c) possibly provide a way for Trigger developers to fallback to sync access (following #36492)
   
   I **think** that cleanest solution we should explicitly recognise sync/async access to Connections/Variables and enable them by a configuration flag for example and have a method/way to check it from Trigger.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [POC][WIP] Async SQLAlchemy sessions in Airflow [airflow]

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on PR #36504:
URL: https://github.com/apache/airflow/pull/36504#issuecomment-1872955940

   > I expect a long time to go/test all cases . But I love it :).
   > 
   > But yeah, if we limit it for Connections and Variables for now, that would be cool. I think one general comment here is that not everyone will be able to use those drivers that support async operations I am afraid. There might be various limitations - dependencies, some specific corporate/managed DB configuration that is not supported - so, almost-for-sure - we should have a way to
   > 
   > a) handle sync access
   b) fail async access (from triggers) when async access is done and the driver does not support it (?)
   c) possibly provide a way for Trigger developers to fallback to sync access (following #36492)
   > 
   > I **think** that cleanest solution we should explicitly recognise sync/async access to Connections/Variables and enable them by a configuration flag for example and have a method/way to check it from Trigger.
   
   I have some ideas for testing the async drivers of each database in our CI in a short execution time (I will implement them soon).
   
   For this point:
   > possibly provide a way for Trigger developers to fallback to sync access
   
   we have different options, like fallback to sync get when the `create_async_session` raises certain exceptions; in this case we can announce that this feature is only supported with some databases (for the rest, it will fallback to sync) and introduce it as an experimental feature to simplify future changes in the design.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [POC][WIP] Async SQLAlchemy sessions in Airflow [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #36504:
URL: https://github.com/apache/airflow/pull/36504#issuecomment-1872896761

   Also - I tihnk  there is a completely separate question on what we do with "secrets" . I guess this sync/async configuration should also apply to secrets and we should extend our SecretsManager API to cover those cases as well:
   
   a) have a way to inform if sync/async is supported
   b) support it :) 
   
   We should also likely have some kind of consistency check - i.e. if Airlfow uses a secrets manager, "async" cannot be enabled unless both driver and secrets manager used suppport async - and in this case Trigger should only work "asynchronously" in case the whole "airflow" installation (DB + Secrets Manager) is async capable, otherwise it should fall-back to sync (i.e. if they need Connections/Variables during event loop handling, they should be passed via - possibly encrypted - serialized data.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org