You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "syedahsn (via GitHub)" <gi...@apache.org> on 2023/03/10 23:33:16 UTC

[GitHub] [airflow] syedahsn opened a new pull request, #30032: Add support for deferrable operators in AMPP

syedahsn opened a new pull request, #30032:
URL: https://github.com/apache/airflow/pull/30032

   The purpose of this PR is to add the foundations required to convert AMPP operators to deferrable operators. 
   The [aiobotocore](https://github.com/aio-libs/aiobotocore) library is being used to make all async calls to the boto3 API.
   
   Rather than create separate Async hooks for every service, we create an `async_conn` property in `base_aws.py` which behaves similar to the existing `conn` property. This property allows us to get access to a client that can make async boto3 API calls. This also allows us to make use of the supporting code that exists for the `conn` property. The `get_client_type` function is extended to handle returning a `ClientCreatorContext` object which can be used to get a client that supports asynchronous calls. 
   
   Another addition this PR makes is extending `get_waiter` in `base_aws.py` to work with custom async waiters. Currently, the `get_waiter` function allows us to create a custom waiter using a JSON config file. This PR allows passing a `client` parameter which can be used to return an async waiter generated by the `aiobotocore` library. This feature will save a lot of code duplication by standardizing the polling portion of Triggers. A README.md file is included that describes how Triggers can be written which make use of these features. Although not all operators and sensor will be able to make use of these features, there are many in the AMPP that can be written in a standardized format following the methods described. 
   
   The `RedshiftCreateClusterOperator` is chosen to demonstrate how an operator would be modified to become a deferrable operator. We use the built-in `cluster_available` waiter in the Trigger to asynchronously poll the boto3 API to wait for the cluster to become available. 
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1132995976


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -67,6 +66,7 @@ dependencies:
   - mypy-boto3-rds>=1.24.0
   - mypy-boto3-redshift-data>=1.24.0
   - mypy-boto3-appflow>=1.24.0
+  - aiobotocore[boto3]

Review Comment:
   Users may install the providers separately with their own constraint mechanism which depends on users list of requested packages may result in using older versions.
   PIP always tries to have the latest version possible but if it can't then it start backtrack to older versions. By setting lower limit we instruct PIP not to look below this version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136239727


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   Sorry for late response, do not have time for Airflow project now, and even do not use it in production right now. So better say I'm not a user of Airflow right now.
   
   I would try to to answer for some messages what I miss.
   
   Note: Everything just my opinion, and I wouldn't try to change anyone opinion mind, because I found on real experience that this not work and all changes request could be removed   
   
   > That 7 month window is kind of the worst case. I'm seeing aiobotocore releases which bump botocore as short as one month (and everything in between).
   
   I wouldn't so optimistic about `aiobotocore` releases, all `botocore` bumps since late 2017.
   
   | Bump Date    | Version  | Days Since LU | Months Since LU |
   |--------------|----------|--------------:|----------------:|
   | Mar 7, 2023  | 1.29.76  |           225 |             7,4 |
   | Aug 25, 2022 | 1.27.59  |           130 |             4,3 |
   | Mar 17, 2022 | 1.24.21  |            93 |             3,1 |
   | Dec 14, 2021 | 1.23.24  |            42 |             1,4 |
   | Nov 2, 2021  | 1.22.8   |           294 |             9,7 |
   | Jan 12, 2021 | 1.19.52  |           178 |             5,9 |
   | Aug 18, 2020 | 1.17.44  |           148 |             4,9 |
   | Feb 21, 2020 | 1.15.3   |           101 |             3,3 |
   | Nov 12, 2019 | 1.13.14  |            18 |             0,6 |
   | Oct 25, 2019 | 1.12.252 |            99 |             3,3 |
   | Jul 18, 2019 | 1.12.189 |           159 |             5,2 |
   | Feb 9, 2019  | 1.12.91  |            62 |               2 |
   | Dec 9, 2018  | 1.12.49  |           143 |             4,7 |
   | Jul 19, 2018 | 1.10.58  |            75 |             2,5 |
   | May 5, 2018  | 1.10.12  |           124 |             4,1 |
   | Jan 1, 2018  | 1.8.21   |             0 |               0 |
   | Jan 1, 2018  | 1.8.20   |             0 |               0 |
   | Jan 1, 2018  | 1.8.12   |             0 |               0 |
   | Jan 1, 2018  | 1.7.48   |            47 |             1,5 |
   | Oct 15, 2017 | 1.7.27   |           n/a |             n/a |
   
   So if exclude multiple bumps in 2018-01-01 than optimistic scenario 4 bumps per year, pessimistic 2 bumps per year and 3.7 median delay per update.
   
   > I personally haven't looked into this. But, sure, we may have to do more manual mocking and testing for the aio usecase.
   
   In past we have fully mocked version of auth as result: https://github.com/apache/airflow/pull/27198 which we found only after 2 months, and affected Amazon Provider 5.1.0 and 6.0.0 (bundled in AWS MWAA 2.4.3)
   
   >  Honestly, I think this ship has long since sailed. I agree that the architecture of deferrable operators/trigger is a bit clunky, leads to a lot of code duplication, and has this duality of sync vs async hooks. But the fact of the matter is Deferrable Operator support is public and we hear from users and customers that they want to use it. So I think an incremental/agile approach is often the best way to approach it. If we wait until there is perfection, we'd never ship anything.
   
   Potentially users doesn't know about all side effects of Deferrable Operators, because it hided from them and presented as better that sync implementation.
   To be honest we do not have any comparison of Sync vs Deferrable for consumption resources, latency, and average time to execute.
   
   > Maybe a helpful exercise would be to list some concrete examples of how this PR would degrade an Airflow user's experience more than it would improve it (other than the botocore version, that one is quite clear).
   
   This PR only uses blocking io implementation, that mean event loop of triggerer would be blocked.
   
   > I have not looked into many details of the AWS hook split - but, it looks good, and I think it can be taken and treated separately (it does not solve indeed botocore/aiobotocore pinning). The question is - is there any problem with joing the two approaches @Taragolis ?
   
   My opinion is that async for Amazon Provider should be Optional dependency which better live separate of Sync implementation:
   - Async will not supported all parameters of AWS Connection
   - `boto3.client` in most cases is basically `botocore.{SomeSpecificClient}`, except `AWS S3`
   - Dependency hell if user want to upgrade `boto3`, that back to the past when `boto3` and `botocore` have weird dependency policy and make `pip` resolve crazy
   
   > I think this is a valid concern and not everything is perfect. And yes, it is a time-bomb potentially. I would like however to have @andrewgodwin comment on that - but I believe we use sync_to_async where necessary (same as in django) for places that are not ready and while it does introduce some serialization it is not catastrophic and is a good way to add mostly async support.
   
   ![image](https://user-images.githubusercontent.com/3998685/225137845-0063d24a-1868-49da-8030-123c9275fd0e.png)
   
   There were PR's which initially tried almost everything wrap to `sync_to_async`. I do not check all of them maybe some of them merged into some provider or core, I've only known about this ones:
   - https://github.com/apache/airflow/pull/29260
   - https://github.com/apache/airflow/pull/29313
   
   I'm more unhappy that we do not disclose information to users about all side effect of `sync_to_async` deferrable operators implementation, and this affect all of them except `TimeSensorAsync` and `TimeDeltaSensorAsync`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vandonr-amz commented on pull request #30032: Add support for deferrable operators in AMPP

Posted by "vandonr-amz (via GitHub)" <gi...@apache.org>.
vandonr-amz commented on PR #30032:
URL: https://github.com/apache/airflow/pull/30032#issuecomment-1509010766

   > There is an unanswered question from @vandonr-amz @syedahsn
   
   Yes we're going to take a look at it together 👌 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1173381568


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -252,6 +257,16 @@ def execute(self, context: Context):
             self.master_user_password,
             params,
         )
+        if self.deferrable:

Review Comment:
   The unit test for this branch is fairly trivial, but, you're correct, it should still be included. Adding it now. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136138558


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   >so while it seems like aiobotocore may pin the botocore version, it shouldn't have major impact on user functionality.
   
   But what about boto3 version pin?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136243220


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   I agree. I don't have a good answer here. This is a dilemma.
   any course of action taken will have pros and cons.
   
   I'm fine with either way we go. We raised the questions I think now we should leave it to AWS folks to debate and let us know their chosen path (in the spirit of shared governance model)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vandonr-amz commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "vandonr-amz (via GitHub)" <gi...@apache.org>.
vandonr-amz commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1152442059


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -634,6 +661,14 @@ def conn(self) -> BaseAwsConnection:
         else:
             return self.get_resource_type(region_name=self.region_name)
 
+    @cached_property

Review Comment:
   I don't think this should be a cached property. It returns a ClientCreatorContext, which is meant to be used in a `with` block. And aiobotocore is written so that the coroutine creating the client is awaited when entering the block:
   https://github.com/aio-libs/aiobotocore/blob/72b8dd5d7d4ef2f1a49a0ae0c37b47e5280e2070/aiobotocore/session.py#L26-L27
   So if we cache the context, trying to using twice (i.e. call `__enter__` twice) will result in awaiting the same coroutine twice, which throws an exception.
   
   What we can do is either:
    * cache the AioBaseClient by calling manually __aenter__ (but it feels wrong)
    * do not cache (but then we recreate the client each time, might be a perf issue or not)
    * find something else to cache mid-way, like the AioSession (but it's more complex to setup here, and depending on where the time is spent, might not help much)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1132995115


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   Having separated base hook for async will not solve the first point you mentioned as even if we have 2 hooks eventually they must agree on boto3 version.
   
   This is one of the cases where the shared governance model of providers is extremely helpful. Lets hear AWS folks perspective and if they considered the issues you raised.
   
   cc @syedahsn @vincbeck @ferruzzi @o-nikolas  @shubham22
   Please review @Taragolis good points about the release cycle of aiobotocore and the effect it will have on limiting boto3 version for long period of time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #30032: Add support for deferrable operators in AMPP

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #30032:
URL: https://github.com/apache/airflow/pull/30032#issuecomment-1508988314

   There is an unanswered question from @vandonr-amz  @syedahsn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1169357850


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -634,6 +661,14 @@ def conn(self) -> BaseAwsConnection:
         else:
             return self.get_resource_type(region_name=self.region_name)
 
+    @cached_property

Review Comment:
   I've tested without the `cached_property` decorator, and everything works as expected. Thanks for pointing this out!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136030496


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   Thanks for looking at the PR! You raised some good points:
   
   The versioning of aiobotocore is a valid issue. However, as @o-nikolas pointed out, the 7 month gap seems to be an outlier. The aiobotocore library has a fairly active community that regularly releases newer versions. Also, while botocore gets updated regularly, generally, there aren't major changes between each update, so while it seems like aiobotocore may pin the botocore version, it shouldn't have major impact on user functionality. 
   >There was plan to add more consistency (not early than 30 Apr) in hooks and add type hinting by separate existed hooks by https://github.com/apache/airflow/discussions/28560 by separate base hooks. And aiobotocore.session.AioSession is not a replacement of boto3.session.Session.
   
   This is a really cool idea, and I'm not certain where aiobotocore would fit in with this paradigm. But while you are correct that `aiobotocore.session.AioSession` is not a replacement to a `boto3.session.Session`, I think that because they both share the botocore library, there are enough similarities between the 2 libraries that the `AioSession` can fit into that model.
   
   >all DB queries are use io blocking implementation (the regular one)
   
   The purpose of this PR was to allow workers to be freed up when involved with time consuming network tasks such as polling services for a certain state etc. DB queries are io blocking operations, and we can apply those optimizations in Amazon Provider once we have a precedence for applying it across Airflow. I believe Network operation optimizations are themselves a big step forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136201716


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   > if someone would like to use newer boto3 and not use aiobotocore extra - it will be possible for them to upgrade boto (and we can also document). There will be no "check" that the newer version will work - as we will not run tests for it - but the user will be able to do their own tests and use newer boto. We will not prevent that.
   
   I'm cool with this. I'm just saying that there is also the option of the other way around.
   Till native and conflictless approach introduced, maybe async operators should be external in their own dedicated package and to be installed by users who seek to use triggers? That way current behavior and expectations are not sacrificed and who ever wants the new functionality can install it like any 3rd party provider.
   
   I'm worried that we may underestimate the impact of limiting boto3.
   
   > We have to sacrifice something
   
   I'm OK with any decision on this. I just want to make sure that all aspects were taken into account.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1158552512


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -72,7 +74,7 @@
 
 class BaseSessionFactory(LoggingMixin):
     """
-    Base AWS Session Factory class to handle boto3 session creation.
+    Base AWS Session Factory class to handle synchronous and async boto session creation.

Review Comment:
    It's true that getting credentials from the database is a blocking io operation, and will hold up the async thread when the connection is made, but I don't think that diminishes the advantages of using deferrable operators too significantly. Just taking the example of creating a Redshift cluster, which takes around 5 minutes to fully provision, if we can free up a worker so that those 5 minutes are used effectively, then the time spent holding up the async thread (which is usually around 0.2 to 0.8 seconds in my experience) is negligible. I do think core airflow can be improved to better support asyncio connections, and it is something that we should make efforts to achieve, but I think it can be done in parallel.
   
   > Except unittest we do not have anything for test this part. Also for obtain initial credentials it is use STS client (from boto3/botocore) which also doesn't use asyncio implementation.
   
   For the unit tests, I was able to work past the sts mocking, and I am in the process of adding more tests. It is slightly more cumbersome to not be able to use moto, but that is the price of using asyncio, and considering the benefits, worth paying in my opinion.
   
   >In addition, AFAIK, some of the part not supported by aiobotocore at all such as Google Federation, SPNEGO/Kerberos and EKS IRSA
   
   This is something that again can be incorporated as the need for these features become apparent. Ideally, the async_conn property is something that we wouldn't encourage users to use on their own, because of the limitations of asyncio. The goal is to provide users with the option of using deferrable operators without them having to deal with asyncio. 
   
   
   >Potentially better idea is extend BaseAsyncSessionFactory rather than BaseSessionFactory
   
   This is an approach that has been taken, and is a valid approach. However, I feel that it leads to large amounts of code duplication, which will become difficult to manage as things progress. I think it is better to include async functionality to BaseSessionFactory, and deal with issues as they come up, so that as things move forward, we work towards a more unified code base, rather than ending up with 2 similar but separate code bases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on PR #30032:
URL: https://github.com/apache/airflow/pull/30032#issuecomment-1509384684

   >There is an unanswered question from @vandonr-amz @syedahsn
   
   This is something I'm currently testing. I feel like it is a valid point, but I want to make sure nothing breaks before I make that change. Will be pushing the change soon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1173868001


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -115,6 +115,22 @@ def test_create_multi_node_cluster(self, mock_get_conn):
 
         # wait_for_completion is False so check waiter is not called
         mock_get_conn.return_value.get_waiter.assert_not_called()
+    
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
+    def test_create_cluster_deferrable(self, mock_get_conn):
+        redshift_operator = RedshiftCreateClusterOperator(
+            task_id="task_test",
+            cluster_identifier="test-cluster",
+            node_type="dc2.large",
+            master_username="adminuser",
+            master_user_password="Test123$",
+            cluster_type="single-node",
+            wait_for_completion=True,
+            deferrable=True,
+        )
+
+        with pytest.raises(TaskDeferred):
+            redshift_operator.execute(None)

Review Comment:
   Interesting. I was expecting to check that `self.defer` have been called, but this one works as well :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1148027713


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   The aiobotocore dependency was merged in a separate PR and combined with Jarek's new testing we should be in a pretty good spot as far as compromise on this front.
   I'm resolving this discussion, please re-open if there is anything else to discuss on the topic!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on pull request #30032: Add support for deferrable operators in AMPP

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on PR #30032:
URL: https://github.com/apache/airflow/pull/30032#issuecomment-1517952781

   There are some static check failures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1132989959


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   My main concerns do not changed:
   - `aiobotocore` pin `botocore` and `boto3` for specific version, last version update takes 7 month, if compare with regular boto stuff it have 3-5 updates per week: fixes, new AWS services and etc. So my opinion that this should not be a part of core dependencies, only if we want to make one or two steps back.
   - There was plan to add more consistency (not early than 30 Apr) in hooks and add type hinting by separate existed hooks by https://github.com/apache/airflow/discussions/28560 by separate base hooks. And `aiobotocore.session.AioSession` is not a replacement of `boto3.session.Session`.
   - `aiobotocore` can't work with `moto` and some critical stuff tested by `moto`, e.g. auth and AssumeRole
   -  My opinion still the same, Airflow core not ready for async stuff `¯\_(ツ)_/¯`, all DB queries are use io blocking implementation (the regular one), so obtain credentials from config still required use some hacks, like `asgiref.sync.sync_to_async` which is transform triggerer service to [some kind of sequential executor](https://github.com/apache/airflow/pull/29038#discussion_r1100693885) 
   - Current PR implementation use only blocking io features, can't find any `asyncio` usage.
   
   Personally I would not use veto for this PR, as well as other async/deferrable related stuff. I start to believe that more we add stuff which impact performance degradation more faster we add native asyncio support in core.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136243220


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   > That is also a possibility of course. The end result is though, that if in the "sync" package you would like to use some new feature you technically should use ">=version_when_it_was_added".
   
   I agree. I don't have a good answer here. This is a dilemma.
   any course of action taken will have pros and cons.
   
   I'm fine with either way we go. We raised the questions I think now we should leave it to AWS folks to debate and let us know their chosen path (in the spirit of shared governance model)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] shubham22 commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "shubham22 (via GitHub)" <gi...@apache.org>.
shubham22 commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136104998


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   > Question to @o-nikolas and the taem - maybe it is already planned or maybe you could raise it and push internally to add async support directly in boto library? That sounds like something that Amazon will be presurred to do anyway I guess? That would be a long-term strategic solution if we know at least that it is coming at some point in time.
   
   We discussed this with the Boto team last October when we added deferrable operators to our roadmap. Yesterday, we revisited the topic with them. The Boto team is already planning to include async functionality in a major release, which will eliminate the need for the aiobotocore library entirely. However, this release won't be available before 2024 as it will be a major change. This is why we decided to use aiobotocore in the short-term as it is the best way forward, even though it's not officially endorsed by the AWS Boto team.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1134674392


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   >    `aiobotocore` pin `botocore` and `boto3` for specific version, last version update takes 7 month, if compare with regular boto stuff it have 3-5 updates per week: fixes, new AWS services and etc. So my opinion that this should not be a part of core dependencies, only if we want to make one or two steps back.
   > 
   That 7 month window is kind of the worst case. I'm seeing aiobotocore releases which bump botocore as short as one month (and everything in between). 
   >There was plan to add more consistency (not early than 30 Apr) in hooks and add type hinting by separate existed hooks by [Amazon Provider: Consistency in boto3-based Hooks #28560](https://github.com/apache/airflow/discussions/28560) by separate base hooks. And `aiobotocore.session.AioSession` is not a replacement of `boto3.session.Session`.
   
   Agree with @eladkal on this one.
   >`aiobotocore` can't work with `moto` and some critical stuff tested by `moto`, e.g. auth and AssumeRole
   
   I personally haven't looked into this. But, sure, we may have to do more manual mocking and testing for the aio usecase.
    
   >My opinion still the same, Airflow core not ready for async stuff `¯\_(ツ)_/¯`, all DB queries are use io blocking implementation (the regular one), so obtain credentials from config still required use some hacks, like `asgiref.sync.sync_to_async` which is transform triggerer service to [some kind of sequential executor](https://github.com/apache/airflow/pull/29038#discussion_r1100693885)
   
   Honestly, I think this ship has long since sailed. I agree that the architecture of deferrable operators/trigger is a bit clunky, leads to a lot of code duplication, and has this duality of sync vs async hooks. But the fact of the matter is Deferrable Operator support is public and we hear from users and customers that they want to use it. So I think an incremental/agile approach is often the best way to approach it. If we wait until there is perfection, we'd never ship anything.
   
   Maybe a helpful exercise would be to list some concrete examples of how this PR would degrade an Airflow user's experience more than it would improve it (other than the botocore version, that one is quite clear).  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1135422375


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   First of all I think we need to be pragmatic and find some reasonable tacktical solutions, even if they are not perfect. And possibly work on strategic solutions to solve the problems in a long term.
   
   TL;DR; We seem to have to make some compromises for now. I think the best compromise it so allow async operators to be merged, while limiting to what we can release for AWS provider (also async) limited what pinned botocore version allows us to do.
   
   I understand @Taragolis concerns and they are very sound. The problem is that we have no easy way out of some  of those problems  in a short term, so the question is what we can do now and whether it will block us to fix it in the long term (if possible).
   
   > aiobotocore pin botocore and boto3 for specific version
   
   This is unfortunate but until AWS will decide to build async support into botocore, I think we will  have to rely on aiobotocore playing the catch-up game. The thing with botocore is that it is pushed frequently, but in fact our users are not updating that frequently, so as long as what we are ok with limiting ourselves in what we publish in AWS operators to whatever version of botocore is pinned, this should be acceptable. This means that new features will have to wait for upgrades of aiobotocore (for example latest release of aiobotocore from a week ago enabled https://github.com/apache/airflow/pull/28850#issuecomment-1467791465 to be merged).  If the AWS team is fine for it, this is fine for me. We often have delays in updating some dependencies and this is quite OK. 
    
   Users could use PythonVirtual or External operator to use botocore's new features - more hassle, but I guess this is pretty acceptable solution for anything that is not available in the "regular" operators - and we should likely describe it somewhere. 
   
   Also pinning the botocore in this case in our CI will be beneficial for that approach - because we will have only the "compatible" version of botocore installed. 
    
   Question to @o-nikolas and the taem - maybe it is already planned or maybe you could raise it and push internally to add async support directly in boto library? That sounds like something that Amazon will be presurred to do anyway I guess? That would be a long-term strategic solution if we know at least that it is coming at some point in time.
   
   > There was plan to add more consistency
   
   I have not looked into many details of the AWS hook split - but, it looks good, and I think it can be taken and treated separately (it does not solve indeed botocore/aiobotocore pinning). The question is - is there any problem with joing the two approaches @Taragolis ? Can we continue the split while using either of the sessions? AioSession derives from  botocore.Session, so while (minus pinning problems), it seems doable to define our own session essentially (guarded by TYPE_CHECKING dynamically) which would be boto.session (if no aiocore installed) or `AioSession | Session` (if installed) and that would actually help with developing providers that will be compatible with the pinned botocore (through our CI).
   
   > aiobotocore can't work with moto
   
   Yeah. more mocking in-house should be needed. Agree with @o-nikolas . Price to pay for async.
   
   > Airflow core not ready for async stuff
   
   I think this is a valid concern and not everything is perfect. And yes, it is a time-bomb potentially. I would like however to have @andrewgodwin comment on that - but I believe we use `sync_to_async` where necessary (same as in django) for places that are not ready and while it does introduce some serialization it is not catastrophic and is a good way to add **mostly** async support. I think Andrew implemented (or at least participated in it) in django for similar cases and maybe we can hear from him a comment about the currnet sync_to_async use. Maybe we could also (based on the comments) start figuring out a long term strategy on how to deal with it.
   
   I think if we agree on this direction, it will unblock tactical changes to add more deferrable operators for AWS, while we could also work on improving the infrastructure and (hopefully) get to the point where async support will be provided by botocore maybe? 
   
   I don't see any other constructive way to approach it, to be honest.
   



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   First of all I think we need to be pragmatic and find some reasonable tactical solutions, even if they are not perfect. And possibly work on strategic solutions to solve the problems in a long term.
   
   TL;DR; We seem to have to make some compromises for now. I think the best compromise it so allow async operators to be merged, while limiting to what we can release for AWS provider (also async) limited what pinned botocore version allows us to do.
   
   I understand @Taragolis concerns and they are very sound. The problem is that we have no easy way out of some  of those problems  in a short term, so the question is what we can do now and whether it will block us to fix it in the long term (if possible).
   
   > aiobotocore pin botocore and boto3 for specific version
   
   This is unfortunate but until AWS will decide to build async support into botocore, I think we will  have to rely on aiobotocore playing the catch-up game. The thing with botocore is that it is pushed frequently, but in fact our users are not updating that frequently, so as long as what we are ok with limiting ourselves in what we publish in AWS operators to whatever version of botocore is pinned, this should be acceptable. This means that new features will have to wait for upgrades of aiobotocore (for example latest release of aiobotocore from a week ago enabled https://github.com/apache/airflow/pull/28850#issuecomment-1467791465 to be merged).  If the AWS team is fine for it, this is fine for me. We often have delays in updating some dependencies and this is quite OK. 
    
   Users could use PythonVirtual or External operator to use botocore's new features - more hassle, but I guess this is pretty acceptable solution for anything that is not available in the "regular" operators - and we should likely describe it somewhere. 
   
   Also pinning the botocore in this case in our CI will be beneficial for that approach - because we will have only the "compatible" version of botocore installed. 
    
   Question to @o-nikolas and the taem - maybe it is already planned or maybe you could raise it and push internally to add async support directly in boto library? That sounds like something that Amazon will be presurred to do anyway I guess? That would be a long-term strategic solution if we know at least that it is coming at some point in time.
   
   > There was plan to add more consistency
   
   I have not looked into many details of the AWS hook split - but, it looks good, and I think it can be taken and treated separately (it does not solve indeed botocore/aiobotocore pinning). The question is - is there any problem with joing the two approaches @Taragolis ? Can we continue the split while using either of the sessions? AioSession derives from  botocore.Session, so while (minus pinning problems), it seems doable to define our own session essentially (guarded by TYPE_CHECKING dynamically) which would be boto.session (if no aiocore installed) or `AioSession | Session` (if installed) and that would actually help with developing providers that will be compatible with the pinned botocore (through our CI).
   
   > aiobotocore can't work with moto
   
   Yeah. more mocking in-house should be needed. Agree with @o-nikolas . Price to pay for async.
   
   > Airflow core not ready for async stuff
   
   I think this is a valid concern and not everything is perfect. And yes, it is a time-bomb potentially. I would like however to have @andrewgodwin comment on that - but I believe we use `sync_to_async` where necessary (same as in django) for places that are not ready and while it does introduce some serialization it is not catastrophic and is a good way to add **mostly** async support. I think Andrew implemented (or at least participated in it) in django for similar cases and maybe we can hear from him a comment about the currnet sync_to_async use. Maybe we could also (based on the comments) start figuring out a long term strategy on how to deal with it.
   
   I think if we agree on this direction, it will unblock tactical changes to add more deferrable operators for AWS, while we could also work on improving the infrastructure and (hopefully) get to the point where async support will be provided by botocore maybe? 
   
   I don't see any other constructive way to approach it, to be honest.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1132995976


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -67,6 +66,7 @@ dependencies:
   - mypy-boto3-rds>=1.24.0
   - mypy-boto3-redshift-data>=1.24.0
   - mypy-boto3-appflow>=1.24.0
+  - aiobotocore[boto3]

Review Comment:
   Users may install providers separately with their own list of requirements which may result in using older versions of boto3. Also some users have thier own constraints mechanism that takes into account internal packages and others.
   PIP always tries to have the latest version possible but if it can't then it start backtrack to older versions. By setting lower limit we instruct PIP not to look below this version and if this happens then PIP will stop and raise error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136030496


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   Thanks for looking at the PR! You raised some good points:
   
   The versioning of aiobotocore is a valid issue. However, as @o-nikolas pointed out, the 7 month gap seems to be an outlier. The aiobotocore library has a fairly active active community that regularly releases newer versions. Also, while botocore gets updated regularly, generally, there aren't major changes between each update, so while it seems like aiobotocore may pin the botocore version, it shouldn't have major impact on user functionality. 
   >There was plan to add more consistency (not early than 30 Apr) in hooks and add type hinting by separate existed hooks by https://github.com/apache/airflow/discussions/28560 by separate base hooks. And aiobotocore.session.AioSession is not a replacement of boto3.session.Session.
   
   This is a really cool idea, and I'm not certain where aiobotocore would fit in with this paradigm. But while you are correct that `aiobotocore.session.AioSession` is not a replacement to a `boto3.session.Session`, I think that because they both share the botocore library, there are enough similarities between the 2 libraries that the `AioSession` can fit into that model.
   
   >all DB queries are use io blocking implementation (the regular one)
   
   The purpose of this PR was to allow workers to be freed up when involved with time consuming network tasks such as polling services for a certain state etc. DB queries are io blocking operations, and we can apply those optimizations in Amazon Provider once we have a precedence for applying it across Airflow. I believe Network operation optimizations are themselves a big step forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1171912516


##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and familiarize yourself with the official [documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html) of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing Amazon Provider Package (AMPP) operators to deferrable operators. Due to the varied complexities of available operators, it is impossible to define one method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, but it is important to study each operator before determining whether the steps outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, parameters are formatted etc. The complexity of this stage depends on the complexity of the task the operator is attempting to do. Some operators (e.g. Sagemaker) have a lot of pre-processing, whereas others require little to no pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task that the operator is attempting to complete. This could be a request to provision a resource, request to change the state of a resource, start a job on a resource etc. Regardless of the operation, the boto3 API returns a response instantly (ignoring network delays) with a response detailing the results of the query. For example, in the case of a resource provisioning request, although the resource can take significant time to be allocated, the boto3 API returns a response to the caller without waiting for the operation to be completed.
+3. The last, often optional, stage is to wait for the operation initiated in stage 2 to be completed. This usually involves polling the boto3 API at set intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a trigger which can handle the polling operation. The botocore library defines waiters for certain services, which are built-in functions that poll a service and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the built-in waiters to allow custom waiters, which follow the same logic, but for services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add `deferrable` as a parameter to the operator, and initialize it in the constructor of the operator.
+The next step is to determine where the operator should be deferred. This will be dependent on what the operator does, and how it is written. Although every operator is different, there are a few guidelines to determine the best place to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the operator does. Often, operators will make various describe calls to to the boto3 API to verify certain conditions, or look up some information before performing its "main" task. Often, right after the "main" call to the boto3 API is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` method if the `deferrable` flag is `True`. The `self.defer` method takes in several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute once the trigger completes its execution. The trigger cannot pass the execution back to the execute method of the operator. By convention, the name for this method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the Trigger can execute for before timing out. This defaults to `None`, meaning no timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is `{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the "main" task of the operator, and is now waiting for a certain resource to reach a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many services, which implements a `wait` method that can be configured via a config file to poll the boto3 API at set intervals, and return if the success criteria is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, defines an `AIOWaiter` class, which also implements a wait method that behaves identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted to use a custom waiter, we would change the code slightly to use the get_waiter function from the hook, rather than the aiobotocore client:

Review Comment:
   ```suggestion
   In this case, we are using the built-in cluster_available waiter. If we wanted to use a custom waiter, we would change the code slightly to use the `get_waiter` function from the hook, rather than the aiobotocore client:
   ```



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -140,6 +143,7 @@ def __init__(
         wait_for_completion: bool = False,
         max_attempt: int = 5,
         poll_interval: int = 60,
+        deferrable: bool = False,

Review Comment:
   docstring



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and familiarize yourself with the official [documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html) of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing Amazon Provider Package (AMPP) operators to deferrable operators. Due to the varied complexities of available operators, it is impossible to define one method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, but it is important to study each operator before determining whether the steps outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, parameters are formatted etc. The complexity of this stage depends on the complexity of the task the operator is attempting to do. Some operators (e.g. Sagemaker) have a lot of pre-processing, whereas others require little to no pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task that the operator is attempting to complete. This could be a request to provision a resource, request to change the state of a resource, start a job on a resource etc. Regardless of the operation, the boto3 API returns a response instantly (ignoring network delays) with a response detailing the results of the query. For example, in the case of a resource provisioning request, although the resource can take significant time to be allocated, the boto3 API returns a response to the caller without waiting for the operation to be completed.
+3. The last, often optional, stage is to wait for the operation initiated in stage 2 to be completed. This usually involves polling the boto3 API at set intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a trigger which can handle the polling operation. The botocore library defines waiters for certain services, which are built-in functions that poll a service and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the built-in waiters to allow custom waiters, which follow the same logic, but for services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add `deferrable` as a parameter to the operator, and initialize it in the constructor of the operator.
+The next step is to determine where the operator should be deferred. This will be dependent on what the operator does, and how it is written. Although every operator is different, there are a few guidelines to determine the best place to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the operator does. Often, operators will make various describe calls to to the boto3 API to verify certain conditions, or look up some information before performing its "main" task. Often, right after the "main" call to the boto3 API is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` method if the `deferrable` flag is `True`. The `self.defer` method takes in several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute once the trigger completes its execution. The trigger cannot pass the execution back to the execute method of the operator. By convention, the name for this method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the Trigger can execute for before timing out. This defaults to `None`, meaning no timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is `{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the "main" task of the operator, and is now waiting for a certain resource to reach a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many services, which implements a `wait` method that can be configured via a config file to poll the boto3 API at set intervals, and return if the success criteria is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, defines an `AIOWaiter` class, which also implements a wait method that behaves identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted to use a custom waiter, we would change the code slightly to use the get_waiter function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which takes an optional argument of deferrable (set to True), and the aiobotocore client. cluster_paused is a custom boto waiter defined in redshift.json  in the airflow/providers/amazon/aws/waiters folder. In general, the config file for a custom waiter should be named as <service_name>.json. The config for cluster_paused is shown below:
+
+```json
+{
+    "version": 2,
+    "waiters": {
+        "cluster_paused": {
+            "operation": "DescribeClusters",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "pathAll",
+                    "argument": "Clusters[].ClusterStatus",
+                    "expected": "paused",
+                    "state": "success"
+                },
+                {
+                    "expected": "ClusterNotFound",
+                    "matcher": "error",
+                    "state": "retry"
+                },
+                {
+                    "expected": "deleting",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "Clusters[].ClusterStatus"
+                }
+            ]
+        },
+    }
+}
+```
+
+For more information about writing custom waiter, see the [README.md](https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/waiters/README.md) for custom waiters.
+
+In some cases, a built-in or custom waiter may not be able to solve the problem. In such cases, the asynchronous method used to poll the boto3 API would need to be defined in the hook of the service being used. This method is essentially the same as the synchronous version of the method, except that it will use the aiobotocore client, and will be awaited. For the Redshift example, the async describe_clusters method would look as follows:

Review Comment:
   ```suggestion
   In some cases, a built-in or custom waiter may not be able to solve the problem. In such cases, the asynchronous method used to poll the boto3 API would need to be defined in the hook of the service being used. This method is essentially the same as the synchronous version of the method, except that it will use the aiobotocore client, and will be awaited. For the Redshift example, the async `describe_clusters` method would look as follows:
   ```



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and familiarize yourself with the official [documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html) of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing Amazon Provider Package (AMPP) operators to deferrable operators. Due to the varied complexities of available operators, it is impossible to define one method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, but it is important to study each operator before determining whether the steps outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, parameters are formatted etc. The complexity of this stage depends on the complexity of the task the operator is attempting to do. Some operators (e.g. Sagemaker) have a lot of pre-processing, whereas others require little to no pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task that the operator is attempting to complete. This could be a request to provision a resource, request to change the state of a resource, start a job on a resource etc. Regardless of the operation, the boto3 API returns a response instantly (ignoring network delays) with a response detailing the results of the query. For example, in the case of a resource provisioning request, although the resource can take significant time to be allocated, the boto3 API returns a response to the caller without waiting for the operation to be completed.
+3. The last, often optional, stage is to wait for the operation initiated in stage 2 to be completed. This usually involves polling the boto3 API at set intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a trigger which can handle the polling operation. The botocore library defines waiters for certain services, which are built-in functions that poll a service and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the built-in waiters to allow custom waiters, which follow the same logic, but for services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add `deferrable` as a parameter to the operator, and initialize it in the constructor of the operator.
+The next step is to determine where the operator should be deferred. This will be dependent on what the operator does, and how it is written. Although every operator is different, there are a few guidelines to determine the best place to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the operator does. Often, operators will make various describe calls to to the boto3 API to verify certain conditions, or look up some information before performing its "main" task. Often, right after the "main" call to the boto3 API is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` method if the `deferrable` flag is `True`. The `self.defer` method takes in several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute once the trigger completes its execution. The trigger cannot pass the execution back to the execute method of the operator. By convention, the name for this method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the Trigger can execute for before timing out. This defaults to `None`, meaning no timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is `{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the "main" task of the operator, and is now waiting for a certain resource to reach a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many services, which implements a `wait` method that can be configured via a config file to poll the boto3 API at set intervals, and return if the success criteria is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, defines an `AIOWaiter` class, which also implements a wait method that behaves identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted to use a custom waiter, we would change the code slightly to use the get_waiter function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which takes an optional argument of deferrable (set to True), and the aiobotocore client. cluster_paused is a custom boto waiter defined in redshift.json  in the airflow/providers/amazon/aws/waiters folder. In general, the config file for a custom waiter should be named as <service_name>.json. The config for cluster_paused is shown below:
+
+```json
+{
+    "version": 2,
+    "waiters": {
+        "cluster_paused": {
+            "operation": "DescribeClusters",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "pathAll",
+                    "argument": "Clusters[].ClusterStatus",
+                    "expected": "paused",
+                    "state": "success"
+                },
+                {
+                    "expected": "ClusterNotFound",
+                    "matcher": "error",
+                    "state": "retry"
+                },
+                {
+                    "expected": "deleting",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "Clusters[].ClusterStatus"
+                }
+            ]
+        },
+    }
+}
+```
+
+For more information about writing custom waiter, see the [README.md](https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/waiters/README.md) for custom waiters.
+
+In some cases, a built-in or custom waiter may not be able to solve the problem. In such cases, the asynchronous method used to poll the boto3 API would need to be defined in the hook of the service being used. This method is essentially the same as the synchronous version of the method, except that it will use the aiobotocore client, and will be awaited. For the Redshift example, the async describe_clusters method would look as follows:
+
+```python
+async with self.async_conn as client:
+    response = client.describe_clusters(ClusterIdentifier=self.cluster_identifier)
+```
+
+This async method can be used in the Trigger to poll the boto3 API. The polling logic will need to be implemented manually, taking care to use asyncio.sleep() rather than time.sleep().
+
+The last step in the Trigger is to yield a TriggerEvent that will be used to alert the Triggerer that the Trigger has finished execution. The TriggerEvent can pass information from the trigger to the method_name method named in the self.defer call in the operator. In the Redshift example, the TriggerEvent would look as follows:

Review Comment:
   ```suggestion
   The last step in the Trigger is to yield a `TriggerEvent` that will be used to alert the `Triggerer` that the Trigger has finished execution. The `TriggerEvent` can pass information from the trigger to the `method_name` method named in the `self.defer` call in the operator. In the Redshift example, the `TriggerEvent` would look as follows:
   ```



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and familiarize yourself with the official [documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html) of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing Amazon Provider Package (AMPP) operators to deferrable operators. Due to the varied complexities of available operators, it is impossible to define one method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, but it is important to study each operator before determining whether the steps outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, parameters are formatted etc. The complexity of this stage depends on the complexity of the task the operator is attempting to do. Some operators (e.g. Sagemaker) have a lot of pre-processing, whereas others require little to no pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task that the operator is attempting to complete. This could be a request to provision a resource, request to change the state of a resource, start a job on a resource etc. Regardless of the operation, the boto3 API returns a response instantly (ignoring network delays) with a response detailing the results of the query. For example, in the case of a resource provisioning request, although the resource can take significant time to be allocated, the boto3 API returns a response to the caller without waiting for the operation to be completed.
+3. The last, often optional, stage is to wait for the operation initiated in stage 2 to be completed. This usually involves polling the boto3 API at set intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a trigger which can handle the polling operation. The botocore library defines waiters for certain services, which are built-in functions that poll a service and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the built-in waiters to allow custom waiters, which follow the same logic, but for services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add `deferrable` as a parameter to the operator, and initialize it in the constructor of the operator.
+The next step is to determine where the operator should be deferred. This will be dependent on what the operator does, and how it is written. Although every operator is different, there are a few guidelines to determine the best place to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the operator does. Often, operators will make various describe calls to to the boto3 API to verify certain conditions, or look up some information before performing its "main" task. Often, right after the "main" call to the boto3 API is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` method if the `deferrable` flag is `True`. The `self.defer` method takes in several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute once the trigger completes its execution. The trigger cannot pass the execution back to the execute method of the operator. By convention, the name for this method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the Trigger can execute for before timing out. This defaults to `None`, meaning no timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is `{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the "main" task of the operator, and is now waiting for a certain resource to reach a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many services, which implements a `wait` method that can be configured via a config file to poll the boto3 API at set intervals, and return if the success criteria is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, defines an `AIOWaiter` class, which also implements a wait method that behaves identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted to use a custom waiter, we would change the code slightly to use the get_waiter function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which takes an optional argument of deferrable (set to True), and the aiobotocore client. cluster_paused is a custom boto waiter defined in redshift.json  in the airflow/providers/amazon/aws/waiters folder. In general, the config file for a custom waiter should be named as <service_name>.json. The config for cluster_paused is shown below:
+
+```json
+{
+    "version": 2,
+    "waiters": {
+        "cluster_paused": {
+            "operation": "DescribeClusters",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "pathAll",
+                    "argument": "Clusters[].ClusterStatus",
+                    "expected": "paused",
+                    "state": "success"
+                },
+                {
+                    "expected": "ClusterNotFound",
+                    "matcher": "error",
+                    "state": "retry"
+                },
+                {
+                    "expected": "deleting",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "Clusters[].ClusterStatus"
+                }
+            ]
+        },
+    }
+}
+```
+
+For more information about writing custom waiter, see the [README.md](https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/waiters/README.md) for custom waiters.
+
+In some cases, a built-in or custom waiter may not be able to solve the problem. In such cases, the asynchronous method used to poll the boto3 API would need to be defined in the hook of the service being used. This method is essentially the same as the synchronous version of the method, except that it will use the aiobotocore client, and will be awaited. For the Redshift example, the async describe_clusters method would look as follows:
+
+```python
+async with self.async_conn as client:
+    response = client.describe_clusters(ClusterIdentifier=self.cluster_identifier)
+```
+
+This async method can be used in the Trigger to poll the boto3 API. The polling logic will need to be implemented manually, taking care to use asyncio.sleep() rather than time.sleep().
+
+The last step in the Trigger is to yield a TriggerEvent that will be used to alert the Triggerer that the Trigger has finished execution. The TriggerEvent can pass information from the trigger to the method_name method named in the self.defer call in the operator. In the Redshift example, the TriggerEvent would look as follows:
+
+```
+yield TriggerEvent({"status": "success", "message": "Cluster Created"})
+```
+
+The object passed through the TrigggerEvent can be captured in the method_name method through an event parameter. This can be used to determine what needs to be done based on the outcome of the Trigger execution. In the Redshift case, we can simply check the status of the event, and raise an Exception if something went wrong.

Review Comment:
   ```suggestion
   The object passed through the `TrigggerEvent` can be captured in the `method_name` method through an event parameter. This can be used to determine what needs to be done based on the outcome of the Trigger execution. In the Redshift case, we can simply check the status of the event, and raise an Exception if something went wrong.
   ```



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -252,6 +257,16 @@ def execute(self, context: Context):
             self.master_user_password,
             params,
         )
+        if self.deferrable:

Review Comment:
   I dont see this new branch being tested in unit test. Do you test it somewhere?



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and familiarize yourself with the official [documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html) of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing Amazon Provider Package (AMPP) operators to deferrable operators. Due to the varied complexities of available operators, it is impossible to define one method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, but it is important to study each operator before determining whether the steps outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, parameters are formatted etc. The complexity of this stage depends on the complexity of the task the operator is attempting to do. Some operators (e.g. Sagemaker) have a lot of pre-processing, whereas others require little to no pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task that the operator is attempting to complete. This could be a request to provision a resource, request to change the state of a resource, start a job on a resource etc. Regardless of the operation, the boto3 API returns a response instantly (ignoring network delays) with a response detailing the results of the query. For example, in the case of a resource provisioning request, although the resource can take significant time to be allocated, the boto3 API returns a response to the caller without waiting for the operation to be completed.
+3. The last, often optional, stage is to wait for the operation initiated in stage 2 to be completed. This usually involves polling the boto3 API at set intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a trigger which can handle the polling operation. The botocore library defines waiters for certain services, which are built-in functions that poll a service and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the built-in waiters to allow custom waiters, which follow the same logic, but for services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add `deferrable` as a parameter to the operator, and initialize it in the constructor of the operator.
+The next step is to determine where the operator should be deferred. This will be dependent on what the operator does, and how it is written. Although every operator is different, there are a few guidelines to determine the best place to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the operator does. Often, operators will make various describe calls to to the boto3 API to verify certain conditions, or look up some information before performing its "main" task. Often, right after the "main" call to the boto3 API is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` method if the `deferrable` flag is `True`. The `self.defer` method takes in several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute once the trigger completes its execution. The trigger cannot pass the execution back to the execute method of the operator. By convention, the name for this method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the Trigger can execute for before timing out. This defaults to `None`, meaning no timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is `{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the "main" task of the operator, and is now waiting for a certain resource to reach a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many services, which implements a `wait` method that can be configured via a config file to poll the boto3 API at set intervals, and return if the success criteria is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, defines an `AIOWaiter` class, which also implements a wait method that behaves identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted to use a custom waiter, we would change the code slightly to use the get_waiter function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which takes an optional argument of deferrable (set to True), and the aiobotocore client. cluster_paused is a custom boto waiter defined in redshift.json  in the airflow/providers/amazon/aws/waiters folder. In general, the config file for a custom waiter should be named as <service_name>.json. The config for cluster_paused is shown below:
+
+```json
+{
+    "version": 2,
+    "waiters": {
+        "cluster_paused": {
+            "operation": "DescribeClusters",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "pathAll",
+                    "argument": "Clusters[].ClusterStatus",
+                    "expected": "paused",
+                    "state": "success"
+                },
+                {
+                    "expected": "ClusterNotFound",
+                    "matcher": "error",
+                    "state": "retry"
+                },
+                {
+                    "expected": "deleting",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "Clusters[].ClusterStatus"
+                }
+            ]
+        },
+    }
+}
+```
+
+For more information about writing custom waiter, see the [README.md](https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/waiters/README.md) for custom waiters.
+
+In some cases, a built-in or custom waiter may not be able to solve the problem. In such cases, the asynchronous method used to poll the boto3 API would need to be defined in the hook of the service being used. This method is essentially the same as the synchronous version of the method, except that it will use the aiobotocore client, and will be awaited. For the Redshift example, the async describe_clusters method would look as follows:
+
+```python
+async with self.async_conn as client:
+    response = client.describe_clusters(ClusterIdentifier=self.cluster_identifier)
+```
+
+This async method can be used in the Trigger to poll the boto3 API. The polling logic will need to be implemented manually, taking care to use asyncio.sleep() rather than time.sleep().

Review Comment:
   ```suggestion
   This async method can be used in the Trigger to poll the boto3 API. The polling logic will need to be implemented manually, taking care to use `asyncio.sleep()` rather than `time.sleep()`.
   ```



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and familiarize yourself with the official [documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html) of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing Amazon Provider Package (AMPP) operators to deferrable operators. Due to the varied complexities of available operators, it is impossible to define one method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, but it is important to study each operator before determining whether the steps outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, parameters are formatted etc. The complexity of this stage depends on the complexity of the task the operator is attempting to do. Some operators (e.g. Sagemaker) have a lot of pre-processing, whereas others require little to no pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task that the operator is attempting to complete. This could be a request to provision a resource, request to change the state of a resource, start a job on a resource etc. Regardless of the operation, the boto3 API returns a response instantly (ignoring network delays) with a response detailing the results of the query. For example, in the case of a resource provisioning request, although the resource can take significant time to be allocated, the boto3 API returns a response to the caller without waiting for the operation to be completed.
+3. The last, often optional, stage is to wait for the operation initiated in stage 2 to be completed. This usually involves polling the boto3 API at set intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a trigger which can handle the polling operation. The botocore library defines waiters for certain services, which are built-in functions that poll a service and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the built-in waiters to allow custom waiters, which follow the same logic, but for services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add `deferrable` as a parameter to the operator, and initialize it in the constructor of the operator.
+The next step is to determine where the operator should be deferred. This will be dependent on what the operator does, and how it is written. Although every operator is different, there are a few guidelines to determine the best place to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the operator does. Often, operators will make various describe calls to to the boto3 API to verify certain conditions, or look up some information before performing its "main" task. Often, right after the "main" call to the boto3 API is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` method if the `deferrable` flag is `True`. The `self.defer` method takes in several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute once the trigger completes its execution. The trigger cannot pass the execution back to the execute method of the operator. By convention, the name for this method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the Trigger can execute for before timing out. This defaults to `None`, meaning no timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is `{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the "main" task of the operator, and is now waiting for a certain resource to reach a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many services, which implements a `wait` method that can be configured via a config file to poll the boto3 API at set intervals, and return if the success criteria is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, defines an `AIOWaiter` class, which also implements a wait method that behaves identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted to use a custom waiter, we would change the code slightly to use the get_waiter function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which takes an optional argument of deferrable (set to True), and the aiobotocore client. cluster_paused is a custom boto waiter defined in redshift.json  in the airflow/providers/amazon/aws/waiters folder. In general, the config file for a custom waiter should be named as <service_name>.json. The config for cluster_paused is shown below:

Review Comment:
   ```suggestion
   Here, we are calling the `get_waiter` function defined in `base_aws.py` which takes an optional argument of `deferrable` (set to `True`), and the `aiobotocore` client. `cluster_paused` is a custom boto waiter defined in `redshift.json`  in the `airflow/providers/amazon/aws/waiters folder`. In general, the config file for a custom waiter should be named as <service_name>.json. The config for `cluster_paused` is shown below:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on PR #30032:
URL: https://github.com/apache/airflow/pull/30032#issuecomment-1507915614

   I've resolved all the failing tests. Please have another look and let me know what you think @potiuk @Taragolis @pankajastro 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vandonr-amz commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "vandonr-amz (via GitHub)" <gi...@apache.org>.
vandonr-amz commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1170312053


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -684,7 +684,6 @@ def conn(self) -> BaseAwsConnection:
         else:
             return self.get_resource_type(region_name=self.region_name)
 
-    @cached_property

Review Comment:
   nit: you could have kept it as a (non-cached) property so that it can be used like `.conn` without parenthesis



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1152510795


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -48,6 +48,7 @@
 from botocore.credentials import ReadOnlyCredentials
 from botocore.waiter import Waiter, WaiterModel
 from dateutil.tz import tzlocal
+from pytest import importorskip

Review Comment:
   Pytest it is not a core dependency of Airflow, only development dependency.
   And I’m not sure that is a good idea to use testing framework outside of the tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #30032: Add support for deferrable operators in AMPP

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #30032:
URL: https://github.com/apache/airflow/pull/30032


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1132995115


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   Having separated base hook for async will not solve the first point you mentioned as even if we have 2 hooks eventually they must agree on boto3 version.
   
   This is one of the cases where the shared governance model of providers is helpful. Lets hear AWS folks perspective and if they considered the issues you raised.
   
   cc @syedahsn @vincbeck @ferruzzi @o-nikolas  @shubham22
   Please review @Taragolis good points about the release cycle of aiobotocore and the effect it will have on limiting boto3 version for long period of time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1132992805


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -67,6 +66,7 @@ dependencies:
   - mypy-boto3-rds>=1.24.0
   - mypy-boto3-redshift-data>=1.24.0
   - mypy-boto3-appflow>=1.24.0
+  - aiobotocore[boto3]

Review Comment:
   The boto3 version comes out to be 1.26.76, with the constraints I had.
   ```
   root@fba2b84bfa3f:/opt/airflow# pip show aiobotocore
   Name: aiobotocore
   Version: 2.5.0
   Summary: Async client for aws services using botocore and aiohttp
   Home-page: https://github.com/aio-libs/aiobotocore
   Author: Nikolay Novik
   Author-email: nickolainovik@gmail.com
   License: Apache License 2.0
   Location: /usr/local/lib/python3.7/site-packages
   Requires: aiohttp, aioitertools, botocore, wrapt
   Required-by: 
   root@fba2b84bfa3f:/opt/airflow# pip show botocore
   Name: botocore
   Version: 1.29.76
   Summary: Low-level, data-driven core of boto 3.
   Home-page: https://github.com/boto/botocore
   Author: Amazon Web Services
   Author-email: 
   License: Apache License 2.0
   Location: /usr/local/lib/python3.7/site-packages
   Requires: jmespath, python-dateutil, urllib3
   Required-by: aiobotocore, aws-xray-sdk, boto3, moto, redshift-connector, s3transfer
   root@fba2b84bfa3f:/opt/airflow# pip show boto3
   Name: boto3
   Version: 1.26.76
   Summary: The AWS SDK for Python
   Home-page: https://github.com/boto/boto3
   Author: Amazon Web Services
   Author-email: 
   License: Apache License 2.0
   Location: /usr/local/lib/python3.7/site-packages
   Requires: botocore, jmespath, s3transfer
   Required-by: aws-sam-translator, moto, redshift-connector, watchtower
   root@fba2b84bfa3f:/opt/airflow# 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1132995115


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   Having separated base hook for async will not solve the first point you mentioned as even if we have 2 hooks eventually they must agree on boto3 version.
   
   This is one of the cases where the shared governance model of providers is helpful. Lets hear AWS folks perspective and if they considered the issues you raised.
   @Taragolis makes good point about the release cycle of aiobotocore and the effect it will have on limiting boto3 version for long period of time
   
   cc @syedahsn @vincbeck @ferruzzi @o-nikolas  @shubham22



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1152485974


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -72,7 +74,7 @@
 
 class BaseSessionFactory(LoggingMixin):
     """
-    Base AWS Session Factory class to handle boto3 session creation.
+    Base AWS Session Factory class to handle synchronous and async boto session creation.

Review Comment:
   This is not technically correct, not all `async` implementation is  `asyncio`. Asyncio stand for asynchronous IO (KO)
   
   And the problem still the same: mixup blocking io and asyncio implementation, see: https://github.com/apache/airflow/pull/30032#discussion_r1136239727, as result
   
   > This PR only uses blocking io implementation, that mean event loop of triggerer would be blocked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1153657308


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -72,7 +74,7 @@
 
 class BaseSessionFactory(LoggingMixin):
     """
-    Base AWS Session Factory class to handle boto3 session creation.
+    Base AWS Session Factory class to handle synchronous and async boto session creation.

Review Comment:
   I can work on the wording here. The idea is that the hook will start supporting async operations. It doesn't necessarily need to support all the features that the synchronous session supports, and we can incrementally add in additional things as required. 
   >And the problem still the same: mixup blocking io and asyncio implementation,
   Can you provide some more information about this? I'm not sure what you mean. There is a [watchdog mechanism](https://github.com/apache/airflow/blob/main/airflow/jobs/triggerer_job.py#L547) in Airflow which prints out a warning if the async thread is blocked for too long. 
   
   For this PR, the trigger, which contains all the async code, is only creating the async client, which is a necessary task, and making the network call which is async. The client creation only happens once per execution of a Trigger. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1154835772


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -72,7 +74,7 @@
 
 class BaseSessionFactory(LoggingMixin):
     """
-    Base AWS Session Factory class to handle boto3 session creation.
+    Base AWS Session Factory class to handle synchronous and async boto session creation.

Review Comment:
   > For this PR, the trigger, which contains all the async code, is only creating the async client, which is a necessary task, and making the network call which is async. The client creation only happens once per execution of a Trigger.
   
   Unfortunetly this is not how it actually work. First of all `BaseSessionFactory` required Airflow connection in most cases, except cases where `aws_conn_id = None`, and obtain Airflow connection it is not support asyncio.
   
   In additional all cases where not required AWS credentials, such as AssumeRole, which should be quite popular if Airflow deployed in AWS environment:
   
   https://github.com/apache/airflow/blob/21bacb578a9bdb9a91a297f1ae019402be1c1368/airflow/providers/amazon/aws/hooks/base_aws.py#L190-L195
   
   This is a good example of mixin of `botocore`, `boto3` and `aiobotocre` which I don't think work well at all. Except unittest we do not have anything for test this part. Also for obtain initial credentials it is use STS client (from boto3/botocore) which also doesn't use asyncio implementation.
   
   In addition, AFAIK, some of the part not supported by `aiobotocore` at all such as Google Federation, SPNEGO/Kerberos and EKS IRSA
   
   As result this call in deferrable mode call a lot of **not asyncio** code which block io:
   
   https://github.com/apache/airflow/blob/21bacb578a9bdb9a91a297f1ae019402be1c1368/airflow/providers/amazon/aws/triggers/redshift_cluster.py#L121
   
   > Can you provide some more information about this? I'm not sure what you mean. There is a [watchdog mechanism](https://github.com/apache/airflow/blob/main/airflow/jobs/triggerer_job.py#L547) in Airflow which prints out a warning if the async thread is blocked for too long.
   
   Personally I found that this part print a lot of false positive notification, even if at this moment there are no deferrable operators run.
   
   ---
   
   Just for the record, I'm not sure that is possible to easily create asyncio implementation for AWS, which support most of the popular part as supported by regular implementation due to different limitations:
   1. Core of Airflow has lack of support asyncio connections
   2. aiobotocore not a fully replacement for botocore and especially for boto3
   
   Anyway it just my thoughts and I should write it down just for try to avoid add some feature which might make someone life harder (especially for contributors).
   
   Potentially better idea is extend `BaseAsyncSessionFactory` rather than `BaseSessionFactory` and describe in docs which auth options supported right now and which are not, that should make users life easier. BTW right now users might create their own class based on `BaseSessionFactory` but not for `BaseAsyncSessionFactory`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1171282677


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -684,7 +684,6 @@ def conn(self) -> BaseAwsConnection:
         else:
             return self.get_resource_type(region_name=self.region_name)
 
-    @cached_property

Review Comment:
   Good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136167424


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   > But what about boto3 version pin?
   
   My view: Assuming that aiobotocore will be released semi-frequently and catch-up with few months old version of boto3/botocore the worst that can happen is some new features/fixes will only be available with few months delay (when new version of aiobotocore is released and we release next version of the provider). Which is not that far from expectations of the users IMHO. Also if - for example - AWS team will see the need of acceleration of an upgrade, there is always possibility of not only opening an issue to aiobotocore team, but also making PR to update to the latest API/features (though it would likely require more involvement and learning).
   
   The users will always have an option to install newer boto3 version in Virtualenv/External or use DockerOperator - following https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#handling-conflicting-complex-python-dependencies (and we could add docs explaining this). 
   
   Also airflow itself and even provider will not limit the boto3. The constraints are "golden" set of dependencies and if someone would like to use newer boto3 and not use aiobotocore extra - it will be possible for them to upgrade  boto (and we can also document). There will be no "check" that the newer version will work - as we will not run tests for it - but the user will be able to do their own tests and use newer boto. We will not prevent that. 
   
   I don't think we can solve all issues here - it's impossible. We have to sacrifice something. This approach is not perfect, but at least we might have steady progress with new deferrable operators for AWS being added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136237987


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   > Till native and conflictless approach introduced, maybe async operators should be external in their own dedicated package and to be installed by users who seek to use triggers? 
   
   That is also a possibility of course. The end result is though, that if in the "sync" package you would like to use some new feature you technically should use ">=version_when_it_was_added".
   
   Byt you really can't add such limit - If you add such limit in the "sync" package, then you won't be able to install those two packages together (which I guess is out-of-the-question). Effectively this means that we will cause breaking some "sync" package features by installing the "async" package. Not even mentioning installing "older" version of the "async" package with newer version of "sync" one - this will break even more things and quite unpredictably. I already imagine the issues raised by the users :).
   
   It will also cause quite a lot of duplication of the code which necessarily will have to be in both packages. I think sharing a code between two packages like that will be quite complex. IMHO it's far more complex to maintain, fix problems, make sure that there is some level of compatibility and handling errors will be a nightmare to maintain. 
   
   I on the other hand, I am worried we may underestimate complexities of maintaining such split :) . Effectively we take the burden on the maintainers/ aws team to handle all the complexity involved rather than leave it up to the users to slightly complicate their workflows if they want to be on the "bleeding edge". 
   
   But if AWS team is ok to maintain such a split -  I am ok with this scenario as well - it is certainly doable. Adding new `asyncaws` provider could be done. Maybe even extracting `common.aws` if we would like to share the code and those who do it will not be afraid of handling cases like we had with common.sql - I just believe it will be super-complex to maintain. You've been warned :). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1136237987


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -42,6 +42,7 @@
 import botocore.session
 import requests
 import tenacity
+from aiobotocore.session import AioSession, get_session as async_get_session

Review Comment:
   > Till native and conflictless approach introduced, maybe async operators should be external in their own dedicated package and to be installed by users who seek to use triggers? 
   
   That is also a possibility of course. The end result is though, that if in the "sync" package you would like to use some new feature you technically should use ">=version_when_it_was_added".
   
   But you really can't add such limit - If you add such limit in the "sync" package, then you won't be able to install those two packages together (which I guess is out-of-the-question). Effectively this means that we will cause breaking some "sync" package features by installing the "async" package. Not even mentioning installing "older" version of the "async" package with newer version of "sync" one - this will break even more things and quite unpredictably. I already imagine the issues raised by the users :).
   
   It will also cause quite a lot of duplication of the code which necessarily will have to be in both packages. I think sharing a code between two packages like that will be quite complex. IMHO it's far more complex to maintain, fix problems, make sure that there is some level of compatibility and handling errors will be a nightmare to maintain. 
   
   I on the other hand, I am worried we may underestimate complexities of maintaining such split :) . Effectively we take the burden on the maintainers/ aws team to handle all the complexity involved rather than leave it up to the users to slightly complicate their workflows if they want to be on the "bleeding edge". 
   
   But if AWS team is ok to maintain such a split -  I am ok with this scenario as well - it is certainly doable. Adding new `asyncaws` provider could be done. Maybe even extracting `common.aws` if we would like to share the code and those who do it will not be afraid of handling cases like we had with common.sql - I just believe it will be super-complex to maintain. You've been warned :). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1132975173


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -67,6 +66,7 @@ dependencies:
   - mypy-boto3-rds>=1.24.0
   - mypy-boto3-redshift-data>=1.24.0
   - mypy-boto3-appflow>=1.24.0
+  - aiobotocore[boto3]

Review Comment:
   ```suggestion
     - aiobotocore[boto3]>=2.2.0
   ```
   We had `boto3>=1.24.0` for a reason
   we should make sure that we don't allow regression of boto3 version
   https://github.com/aio-libs/aiobotocore/blob/master/CHANGES.rst#220-2022-03-16



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #30032: Add support for deferrable operators in AMPP

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #30032:
URL: https://github.com/apache/airflow/pull/30032#issuecomment-1484285584

   You need to skip your aiobotocore tests conditionally  - see other deferrable tests in aws


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1153657308


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -72,7 +74,7 @@
 
 class BaseSessionFactory(LoggingMixin):
     """
-    Base AWS Session Factory class to handle boto3 session creation.
+    Base AWS Session Factory class to handle synchronous and async boto session creation.

Review Comment:
   I can work on the wording here. The idea is that the hook will start supporting async operations. It doesn't necessarily need to support all the features that the synchronous session supports, and we can incrementally add in additional things as required. 
   >And the problem still the same: mixup blocking io and asyncio implementation,
   
   
   Can you provide some more information about this? I'm not sure what you mean. There is a [watchdog mechanism](https://github.com/apache/airflow/blob/main/airflow/jobs/triggerer_job.py#L547) in Airflow which prints out a warning if the async thread is blocked for too long. 
   
   For this PR, the trigger, which contains all the async code, is only creating the async client, which is a necessary task, and making the network call which is async. The client creation only happens once per execution of a Trigger. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #30032: Add support for deferrable operators in AMPP

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1158554262


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -48,6 +48,7 @@
 from botocore.credentials import ReadOnlyCredentials
 from botocore.waiter import Waiter, WaiterModel
 from dateutil.tz import tzlocal
+from pytest import importorskip

Review Comment:
   This was something I was testing with - I've removed it now. I'm still dealing with issues regarding aiobotocore imports, but I've been able to reduce them as much as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org