You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "rajaths010494 (via GitHub)" <gi...@apache.org> on 2023/02/02 05:57:55 UTC

[GitHub] [airflow] rajaths010494 opened a new pull request, #29300: Add deferrable BatchOperator

rajaths010494 opened a new pull request, #29300:
URL: https://github.com/apache/airflow/pull/29300

   This PR donates the following big query deferrable operators and sensors developed in [astronomer-providers](https://github.com/astronomer/astronomer-providers) repo to apache airflow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1095392580


##########
generated/provider_dependencies.json:
##########
@@ -17,6 +17,7 @@
   },
   "amazon": {
     "deps": [
+      "aiobotocore>=2.1.1",

Review Comment:
   Sure will look into that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1157455819


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:

Review Comment:
   Added proper parameter's docstring



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1158562863


##########
tests/providers/amazon/aws/deferrable/hooks/test_batch_client.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import sys
+
+import botocore
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.batch_client import BatchClientAsyncHook
+
+if sys.version_info < (3, 8):
+    # For compatibility with Python 3.7
+    from asynctest import mock as async_mock
+else:
+    from unittest import mock as async_mock
+
+pytest.importorskip("aiobotocore")
+
+
+class TestBatchClientAsyncHook:

Review Comment:
   Interesting! Thanks for the explanations, I did not know that. LGTM then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1152789008


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,240 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:
+            self.waiters.wait_for_job(self.job_id)
+            return None
+        else:
+            await self.wait_for_job(self.job_id)
+            await self.check_job_success(self.job_id)
+            success_msg = f"AWS Batch job ({self.job_id}) succeeded"
+            self.log.info(success_msg)
+            return {"status": "success", "message": success_msg}
+
+    async def check_job_success(self, job_id: str) -> bool:  # type: ignore[override]
+        """
+        Check the final status of the Batch job; return True if the job
+        'SUCCEEDED', else raise an AirflowException
+
+        :param job_id: a Batch job ID
+
+        :raises: AirflowException
+        """
+        job = await self.get_job_description(job_id)
+        job_status = job.get("status")
+        if job_status == self.SUCCESS_STATE:
+            self.log.info("AWS Batch job (%s) succeeded: %s", job_id, job)
+            return True
+
+        if job_status == self.FAILURE_STATE:
+            raise AirflowException(f"AWS Batch job ({job_id}) failed: {job}")
+
+        if job_status in self.INTERMEDIATE_STATES:
+            raise AirflowException(f"AWS Batch job ({job_id}) is not complete: {job}")
+
+        raise AirflowException(f"AWS Batch job ({job_id}) has unknown status: {job}")
+
+    @staticmethod
+    async def delay(delay: int | float | None = None) -> None:  # type: ignore[override]
+        """
+        Pause execution for ``delay`` seconds.
+
+        :param delay: a delay to pause execution using ``time.sleep(delay)``;
+            a small 1 second jitter is applied to the delay.
+
+        .. note::
+            This method uses a default random delay, i.e.
+            ``random.sample()``;
+            using a random interval helps to avoid AWS API throttle limits
+            when many concurrent tasks request job-descriptions.
+        """
+        if delay is None:
+            delay = sample(
+                list(range(BatchClientAsyncHook.DEFAULT_DELAY_MIN, BatchClientAsyncHook.DEFAULT_DELAY_MAX)), 1
+            )[0]

Review Comment:
   ```suggestion
               delay = uniform(BatchClientAsyncHook.DEFAULT_DELAY_MIN, BatchClientAsyncHook.DEFAULT_DELAY_MAX)
   ```
   
   The code in `astronomer-providers` was introduced to pass the `BanditCheck`, here it is not needed and needs to be using the uniform method



##########
airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -71,6 +72,7 @@ class BatchOperator(BaseOperator):
         Override the region_name in connection (if provided)
     :param tags: collection of tags to apply to the AWS Batch job submission
         if None, no tags are submitted
+    :param deferrable: Run operator in the deferrable mode.

Review Comment:
   can you please add docs indicating the new param `deferrable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1156771619


##########
tests/providers/amazon/aws/operators/test_batch.py:
##########
@@ -211,3 +218,114 @@ def test_execute(self, mock_conn):
             computeResources=compute_resources,
             tags=tags,
         )
+
+
+def create_context(task, dag=None):

Review Comment:
   `deferrable/hooks/test_batch_client.py` are hook tests and the `tests/providers/amazon/aws/operators/test_batch.py' are for operators.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1157272204


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:

Review Comment:
   That makes sense. Yes please if you can create a PR or at least a issue to track this, that would be great :)



##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:

Review Comment:
   That makes sense. Yes please if you can create a PR or at least an issue to track this, that would be great :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1094552773


##########
generated/provider_dependencies.json:
##########
@@ -17,6 +17,7 @@
   },
   "amazon": {
     "deps": [
+      "aiobotocore>=2.1.1",

Review Comment:
   
   
   Same comment as https://github.com/apache/airflow/pull/28850#discussion_r1066843322 applies here too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1156790735


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:

Review Comment:
   Currently, in the sync version, the waiter was implemented in this way https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/operators/batch.py#L247-L250. 
   SO I  kept the same implementation in async also. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1154541102


##########
tests/providers/amazon/aws/operators/test_batch.py:
##########
@@ -211,3 +218,114 @@ def test_execute(self, mock_conn):
             computeResources=compute_resources,
             tags=tags,
         )
+
+
+def create_context(task, dag=None):

Review Comment:
   whats the difference between the tests here and the ones under `deferrable/hooks/test_batch_client.py` ?



##########
docs/apache-airflow-providers-amazon/operators/batch.rst:
##########
@@ -38,6 +38,7 @@ Submit a new AWS Batch job
 ==========================
 
 To submit a new AWS Batch job and monitor it until it reaches a terminal state you can
+also run this operator in deferrable mode by setting ``deferrable`` param to ``True``.

Review Comment:
   this sentence is confusing. can you please re-word it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil merged pull request #29300: Add deferrable BatchOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil merged PR #29300:
URL: https://github.com/apache/airflow/pull/29300


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1158021813


##########
tests/providers/amazon/aws/deferrable/hooks/test_batch_client.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import sys
+
+import botocore
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.batch_client import BatchClientAsyncHook
+
+if sys.version_info < (3, 8):
+    # For compatibility with Python 3.7
+    from asynctest import mock as async_mock
+else:
+    from unittest import mock as async_mock
+
+pytest.importorskip("aiobotocore")
+
+
+class TestBatchClientAsyncHook:

Review Comment:
   Even I had parameterised the test at first but got this error https://github.com/Martiusweb/asynctest/issues/152#issue-666326338 and CI was failing, by looking at this PR comment https://github.com/apache/airflow/pull/29801#discussion_r1125620257 I had them separated then the CI was passing for me.



##########
tests/providers/amazon/aws/deferrable/triggers/test_batch.py:
##########
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import importlib.util
+
+import pytest
+
+from airflow.providers.amazon.aws.triggers.batch import (
+    BatchOperatorTrigger,
+)
+from airflow.triggers.base import TriggerEvent
+from tests.providers.amazon.aws.utils.compat import async_mock
+
+JOB_NAME = "51455483-c62c-48ac-9b88-53a6a725baa3"
+JOB_ID = "8ba9d676-4108-4474-9dca-8bbac1da9b19"
+MAX_RETRIES = 2
+STATUS_RETRIES = 3
+POKE_INTERVAL = 5
+AWS_CONN_ID = "airflow_test"
+REGION_NAME = "eu-west-1"
+
+
+@pytest.mark.skipif(not bool(importlib.util.find_spec("aiobotocore")), reason="aiobotocore require")
+class TestBatchOperatorTrigger:
+    TRIGGER = BatchOperatorTrigger(
+        job_id=JOB_ID,
+        job_name=JOB_NAME,
+        job_definition="hello-world",
+        job_queue="queue",
+        waiters=None,
+        tags={},
+        max_retries=MAX_RETRIES,
+        status_retries=STATUS_RETRIES,
+        parameters={},
+        overrides={},
+        array_properties={},
+        region_name="eu-west-1",
+        aws_conn_id="airflow_test",
+    )
+
+    def test_batch_trigger_serialization(self):
+        """
+        Asserts that the BatchOperatorTrigger correctly serializes its arguments
+        and classpath.
+        """
+
+        classpath, kwargs = self.TRIGGER.serialize()
+        assert classpath == "airflow.providers.amazon.aws.triggers.batch.BatchOperatorTrigger"
+        assert kwargs == {
+            "job_id": JOB_ID,
+            "job_name": JOB_NAME,
+            "job_definition": "hello-world",
+            "job_queue": "queue",
+            "waiters": None,
+            "tags": {},
+            "max_retries": MAX_RETRIES,
+            "status_retries": STATUS_RETRIES,
+            "parameters": {},
+            "overrides": {},
+            "array_properties": {},
+            "region_name": "eu-west-1",
+            "aws_conn_id": "airflow_test",
+        }
+
+    @pytest.mark.asyncio
+    async def test_batch_trigger_run(self):
+        """Test that the task is not done when event is not returned from trigger."""
+
+        task = asyncio.create_task(self.TRIGGER.run().__anext__())
+        await asyncio.sleep(0.5)
+        # TriggerEvent was not returned
+        assert task.done() is False
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientAsyncHook.monitor_job")
+    async def test_batch_trigger_completed(self, mock_response):
+        """Test if the success event is  returned from trigger."""
+        mock_response.return_value = {"status": "success", "message": f"AWS Batch job ({JOB_ID}) succeeded"}
+
+        generator = self.TRIGGER.run()
+        actual_response = await generator.asend(None)
+        assert (
+            TriggerEvent({"status": "success", "message": f"AWS Batch job ({JOB_ID}) succeeded"})
+            == actual_response
+        )
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientAsyncHook.monitor_job")
+    async def test_batch_trigger_failure(self, mock_response):
+        """Test if the failure event is returned from trigger."""
+        mock_response.return_value = {"status": "error", "message": f"{JOB_ID} failed"}
+
+        generator = self.TRIGGER.run()
+        actual_response = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": f"{JOB_ID} failed"}) == actual_response
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientAsyncHook.monitor_job")
+    async def test_batch_trigger_none(self, mock_response):
+        """Test if the failure event is returned when there is no response from hook."""
+        mock_response.return_value = None
+
+        generator = self.TRIGGER.run()
+        actual_response = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": f"{JOB_ID} failed"}) == actual_response

Review Comment:
   Even I had parameterised the test at first but got this error https://github.com/Martiusweb/asynctest/issues/152#issue-666326338 and CI was failing, by looking at this PR comment https://github.com/apache/airflow/pull/29801#discussion_r1125620257 I had them separated then the CI was passing for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1152823575


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,240 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:
+            self.waiters.wait_for_job(self.job_id)
+            return None
+        else:
+            await self.wait_for_job(self.job_id)
+            await self.check_job_success(self.job_id)
+            success_msg = f"AWS Batch job ({self.job_id}) succeeded"
+            self.log.info(success_msg)
+            return {"status": "success", "message": success_msg}
+
+    async def check_job_success(self, job_id: str) -> bool:  # type: ignore[override]
+        """
+        Check the final status of the Batch job; return True if the job
+        'SUCCEEDED', else raise an AirflowException
+
+        :param job_id: a Batch job ID
+
+        :raises: AirflowException
+        """
+        job = await self.get_job_description(job_id)
+        job_status = job.get("status")
+        if job_status == self.SUCCESS_STATE:
+            self.log.info("AWS Batch job (%s) succeeded: %s", job_id, job)
+            return True
+
+        if job_status == self.FAILURE_STATE:
+            raise AirflowException(f"AWS Batch job ({job_id}) failed: {job}")
+
+        if job_status in self.INTERMEDIATE_STATES:
+            raise AirflowException(f"AWS Batch job ({job_id}) is not complete: {job}")
+
+        raise AirflowException(f"AWS Batch job ({job_id}) has unknown status: {job}")
+
+    @staticmethod
+    async def delay(delay: int | float | None = None) -> None:  # type: ignore[override]
+        """
+        Pause execution for ``delay`` seconds.
+
+        :param delay: a delay to pause execution using ``time.sleep(delay)``;
+            a small 1 second jitter is applied to the delay.
+
+        .. note::
+            This method uses a default random delay, i.e.
+            ``random.sample()``;
+            using a random interval helps to avoid AWS API throttle limits
+            when many concurrent tasks request job-descriptions.
+        """
+        if delay is None:
+            delay = sample(
+                list(range(BatchClientAsyncHook.DEFAULT_DELAY_MIN, BatchClientAsyncHook.DEFAULT_DELAY_MAX)), 1
+            )[0]

Review Comment:
   Fixed in [4bced0a](https://github.com/apache/airflow/pull/29300/commits/4bced0aa06ed6d5f63c1aa6a346719b046dbc5fc)



##########
airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -71,6 +72,7 @@ class BatchOperator(BaseOperator):
         Override the region_name in connection (if provided)
     :param tags: collection of tags to apply to the AWS Batch job submission
         if None, no tags are submitted
+    :param deferrable: Run operator in the deferrable mode.

Review Comment:
   Fixed in [4bced0a](https://github.com/apache/airflow/pull/29300/commits/4bced0aa06ed6d5f63c1aa6a346719b046dbc5fc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1153311655


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:

Review Comment:
   I think leaving the option to the user to pass the waiters make everything more complicated no? Is there any reason why you want to do it that way? Another solution would be to implement a waiter for batch following [these instructions](https://github.com/apache/airflow/tree/main/airflow/providers/amazon/aws/waiters) and then use it in this hook. It would avoid having 2 branches (one with waiters passed as parameters, one with no waiter passed) and make the hook easier to use (no need to wonder whether we should pass some waiters). WDYT?



##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:

Review Comment:
   Docstring does not match parameters here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1150376172


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -865,3 +868,85 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseAsyncHook(AwsBaseHook):

Review Comment:
   removed
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1157456074


##########
docs/apache-airflow-providers-amazon/operators/batch.rst:
##########
@@ -38,6 +38,7 @@ Submit a new AWS Batch job
 ==========================
 
 To submit a new AWS Batch job and monitor it until it reaches a terminal state you can
+also run this operator in deferrable mode by setting ``deferrable`` param to ``True``.

Review Comment:
   modifed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1157479626


##########
tests/providers/amazon/aws/deferrable/hooks/test_batch_client.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import sys
+
+import botocore
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.batch_client import BatchClientAsyncHook
+
+if sys.version_info < (3, 8):
+    # For compatibility with Python 3.7
+    from asynctest import mock as async_mock
+else:
+    from unittest import mock as async_mock
+
+pytest.importorskip("aiobotocore")
+
+
+class TestBatchClientAsyncHook:

Review Comment:
   Thanks for adding all these tests, they all look good to me. However, for code clarity purposes, could you try to merge some using [@pytest.mark.parametrize](https://docs.pytest.org/en/6.2.x/parametrize.html)? Many of your tests are really similar and I am sure by using `@pytest.mark.parametrize`, you could reduce the length of your unit tests by a lot



##########
tests/providers/amazon/aws/deferrable/triggers/test_batch.py:
##########
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import importlib.util
+
+import pytest
+
+from airflow.providers.amazon.aws.triggers.batch import (
+    BatchOperatorTrigger,
+)
+from airflow.triggers.base import TriggerEvent
+from tests.providers.amazon.aws.utils.compat import async_mock
+
+JOB_NAME = "51455483-c62c-48ac-9b88-53a6a725baa3"
+JOB_ID = "8ba9d676-4108-4474-9dca-8bbac1da9b19"
+MAX_RETRIES = 2
+STATUS_RETRIES = 3
+POKE_INTERVAL = 5
+AWS_CONN_ID = "airflow_test"
+REGION_NAME = "eu-west-1"
+
+
+@pytest.mark.skipif(not bool(importlib.util.find_spec("aiobotocore")), reason="aiobotocore require")
+class TestBatchOperatorTrigger:
+    TRIGGER = BatchOperatorTrigger(
+        job_id=JOB_ID,
+        job_name=JOB_NAME,
+        job_definition="hello-world",
+        job_queue="queue",
+        waiters=None,
+        tags={},
+        max_retries=MAX_RETRIES,
+        status_retries=STATUS_RETRIES,
+        parameters={},
+        overrides={},
+        array_properties={},
+        region_name="eu-west-1",
+        aws_conn_id="airflow_test",
+    )
+
+    def test_batch_trigger_serialization(self):
+        """
+        Asserts that the BatchOperatorTrigger correctly serializes its arguments
+        and classpath.
+        """
+
+        classpath, kwargs = self.TRIGGER.serialize()
+        assert classpath == "airflow.providers.amazon.aws.triggers.batch.BatchOperatorTrigger"
+        assert kwargs == {
+            "job_id": JOB_ID,
+            "job_name": JOB_NAME,
+            "job_definition": "hello-world",
+            "job_queue": "queue",
+            "waiters": None,
+            "tags": {},
+            "max_retries": MAX_RETRIES,
+            "status_retries": STATUS_RETRIES,
+            "parameters": {},
+            "overrides": {},
+            "array_properties": {},
+            "region_name": "eu-west-1",
+            "aws_conn_id": "airflow_test",
+        }
+
+    @pytest.mark.asyncio
+    async def test_batch_trigger_run(self):
+        """Test that the task is not done when event is not returned from trigger."""
+
+        task = asyncio.create_task(self.TRIGGER.run().__anext__())
+        await asyncio.sleep(0.5)
+        # TriggerEvent was not returned
+        assert task.done() is False
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientAsyncHook.monitor_job")
+    async def test_batch_trigger_completed(self, mock_response):
+        """Test if the success event is  returned from trigger."""
+        mock_response.return_value = {"status": "success", "message": f"AWS Batch job ({JOB_ID}) succeeded"}
+
+        generator = self.TRIGGER.run()
+        actual_response = await generator.asend(None)
+        assert (
+            TriggerEvent({"status": "success", "message": f"AWS Batch job ({JOB_ID}) succeeded"})
+            == actual_response
+        )
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientAsyncHook.monitor_job")
+    async def test_batch_trigger_failure(self, mock_response):
+        """Test if the failure event is returned from trigger."""
+        mock_response.return_value = {"status": "error", "message": f"{JOB_ID} failed"}
+
+        generator = self.TRIGGER.run()
+        actual_response = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": f"{JOB_ID} failed"}) == actual_response
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.amazon.aws.hooks.batch_client.BatchClientAsyncHook.monitor_job")
+    async def test_batch_trigger_none(self, mock_response):
+        """Test if the failure event is returned when there is no response from hook."""
+        mock_response.return_value = None
+
+        generator = self.TRIGGER.run()
+        actual_response = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": f"{JOB_ID} failed"}) == actual_response

Review Comment:
   Similar comment. By using `@pytest.mark.parametrize`, you can merge these 3 tests into one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1113081966


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -865,3 +868,85 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseAsyncHook(AwsBaseHook):

Review Comment:
   need to remove the changes for base async hook from this PR as that will be contributed from #28850 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1156790735


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:

Review Comment:
   Currently, in the sync version, the waiter was implemented in this way https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/operators/batch.py#L247-L250. 
   So I  kept the same implementation in async also. 
   @vincbeck I can add those in a separate PR adding waiters for both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #29300: Add deferrable BatchOperator

Posted by "rajaths010494 (via GitHub)" <gi...@apache.org>.
rajaths010494 commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1157282532


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is applied to the
+        delay .  It is generally recommended that random jitter is added to API requests.
+        A convenience method is provided for this, e.g. to get a random delay of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be raised if execution_timeout
+        is given while creating the task. These exceptions should be handled in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:

Review Comment:
   Yes sure i have created the issue here https://github.com/apache/airflow/issues/30457
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org