You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Taragolis (via GitHub)" <gi...@apache.org> on 2023/02/08 21:22:53 UTC

[GitHub] [airflow] Taragolis commented on a diff in pull request #29038: Add HttpHookAsync for deferrable implementation

Taragolis commented on code in PR #29038:
URL: https://github.com/apache/airflow/pull/29038#discussion_r1100693885


##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
             return True, "Connection successfully tested"
         except Exception as e:
             return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+    """
+    Interact with HTTP servers using Python Async.
+
+    :param method: the API method to be called
+    :param http_conn_id: http connection id that has the base
+        API url i.e https://www.google.com/ and optional authentication credentials. Default
+        headers can also be specified in the Extra field in json format.
+    :param auth_type: The auth type for the service
+    """
+
+    conn_name_attr = "http_conn_id"
+    default_conn_name = "http_default"
+    conn_type = "http"
+    hook_name = "HTTP"
+
+    def __init__(
+        self,
+        method: str = "POST",
+        http_conn_id: str = default_conn_name,
+        auth_type: Any = aiohttp.BasicAuth,
+        retry_limit: int = 3,
+        retry_delay: float = 1.0,
+    ) -> None:
+        self.http_conn_id = http_conn_id
+        self.method = method.upper()
+        self.base_url: str = ""
+        self._retry_obj: Callable[..., Any]
+        self.auth_type: Any = auth_type
+        if retry_limit < 1:
+            raise ValueError("Retry limit must be greater than equal to 1")
+        self.retry_limit = retry_limit
+        self.retry_delay = retry_delay
+
+    async def run(
+        self,
+        endpoint: str | None = None,
+        data: dict[str, Any] | str | None = None,
+        headers: dict[str, Any] | None = None,
+        extra_options: dict[str, Any] | None = None,
+    ) -> "ClientResponse":
+        r"""
+        Performs an asynchronous HTTP request call
+
+        :param endpoint: the endpoint to be called i.e. resource/v1/query?
+        :param data: payload to be uploaded or request parameters
+        :param headers: additional headers to be passed through as a dictionary
+        :param extra_options: Additional kwargs to pass when creating a request.
+            For example, ``run(json=obj)`` is passed as ``aiohttp.ClientSession().get(json=obj)``
+        """
+        extra_options = extra_options or {}
+
+        # headers may be passed through directly or in the "extra" field in the connection
+        # definition
+        _headers = {}
+        auth = None
+
+        if self.http_conn_id:

Review Comment:
   Well, I found a time for check this magic. As I could see `sync_to_async` run in ThreadPoolExecutor which is use internally threads. However for avoid any thread related issues it run in pool with single worker if thread_sensitive=True.
   
   https://github.com/django/asgiref/blob/730d865506adf565d53e5a67cceb9f796dc472ba/asgiref/sync.py#L345-L346
   
   It will run all functions/methods decorated by `sync_to_async` sequentially one after another as result no concurrency here, and one function which hung, for example due to infinity timeout to some service which do not send response, would block others
   
   That is some simple sample
   
   ```python
   import asyncio
   import datetime
   import time
   from asgiref.sync import sync_to_async
   
   
   @sync_to_async
   def task(value):
       print(f"{datetime.datetime.utcnow().isoformat()}, sync_to_async start {value!r}")
       time.sleep(3)
       print(f"{datetime.datetime.utcnow().isoformat()}, sync_to_async end {value!r}")
   
   async def async_task(value):
       print(f"{datetime.datetime.utcnow().isoformat()}, async_task start {value!r}")
       await asyncio.sleep(3)
       print(f"{datetime.datetime.utcnow().isoformat()}, async_task end {value!r}")
   
   async def main():
       await asyncio.gather(
           asyncio.create_task(task(1)),
           asyncio.create_task(task(2)),
           asyncio.create_task(async_task(3)),
           asyncio.create_task(async_task(4)),
           asyncio.create_task(task(5)),
           asyncio.create_task(async_task(6)),
       )
   
   if __name__ == "__main__":
       asyncio.run(main())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org