You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/11 08:56:37 UTC

[GitHub] [airflow] pankajastro opened a new pull request, #28850: Add deferrable mode in RedshiftPauseClusterOperator

pankajastro opened a new pull request, #28850:
URL: https://github.com/apache/airflow/pull/28850

   Add deferrable in RedshiftPauseClusterOperator
   
   This PR donates the following developed RedshiftPauseClusterOperatorAsync` in [astronomer-providers](https://github.com/astronomer/astronomer-providers) repo to apache airflow.
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1066843322


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   I think add this as core dependency for amazon-provider could broke some existed functionality.
   At least until `aiobotocore` resolve this issue https://github.com/aio-libs/aiobotocore/issues/976 and remove upper bound of [`botocore`](https://github.com/aio-libs/aiobotocore/blob/14b2f5fe0033a1f4bbc456f10633c2aeea74f57a/setup.py#L10).
   
   Because `botocore` is a main package for `boto3` and if we can't upgrade both of them we can add new features which introduced since `botocore` 1.28.0+



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
+
+        conn_config = AwsConnectionWrapper(
+            conn=connection_object,
+            region_name=self.region_name,
+            botocore_config=self.config,
+            verify=self.verify,
+        )
+
+        async_connection = get_session()
+        session_token = conn_config.aws_session_token
+        aws_secret = conn_config.aws_secret_access_key
+        aws_access = conn_config.aws_access_key_id
+        if conn_config.role_arn:
+            credentials = await self.get_role_credentials(
+                async_session=async_connection, conn_config=conn_config
+            )
+            if credentials:
+                session_token = credentials["SessionToken"]
+                aws_access = credentials["AccessKeyId"]
+                aws_secret = credentials["SecretAccessKey"]
+        return async_connection.create_client(
+            service_name=self.client_type,
+            region_name=conn_config.region_name,
+            aws_secret_access_key=aws_secret,
+            aws_access_key_id=aws_access,
+            aws_session_token=session_token,
+            verify=self.verify,
+            config=self.config,
+            endpoint_url=conn_config.endpoint_url,
+        )
+
+    @staticmethod
+    async def get_role_credentials(
+        async_session: AioSession, conn_config: AwsConnectionWrapper
+    ) -> Optional[Dict[str, str]]:
+        """Get the role_arn, method credentials from connection details and get the role credentials detail"""
+        async with async_session.create_client(
+            "sts",
+            aws_access_key_id=conn_config.aws_access_key_id,
+            aws_secret_access_key=conn_config.aws_secret_access_key,
+        ) as client:
+            return_response = None
+            if conn_config.assume_role_method == "assume_role" or conn_config.assume_role_method is None:
+                response: Dict[str, Dict[str, str]] = await client.assume_role(
+                    RoleArn=conn_config.role_arn,
+                    RoleSessionName="RoleSession",
+                    **conn_config.assume_role_kwargs,
+                )
+                return_response = response["Credentials"]
+            return return_response

Review Comment:
   By use this we lost ability for auto-renew expired credentials as well as all additional methods of obtain credentials which provided by AWS Connection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1122802101


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -596,3 +597,4 @@ additional-extras:
   - name: pandas
     dependencies:
       - pandas>=0.17.1
+      - aiobotocore>=2.1.1

Review Comment:
   Not only in CI, we should mention about this in documentation otherwise we have tons of issues/discussion opened by users who want to use deferrable AWS operators and now have potentially broken system.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1066844414


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -35,7 +35,12 @@
 from functools import wraps
 from os import PathLike
 from pathlib import Path
-from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar, Union
+from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar, Union
+
+from asgiref.sync import sync_to_async

Review Comment:
   We should explicitly add asgiref to dependencies



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1119400678


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -458,11 +460,15 @@ def __init__(
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        deferrable: bool = False,
+        poll_interval: int = 10,

Review Comment:
   I can't speak for the botocore team on that, I don't have any contact with that team, so I wouldn't know any better than you on that one.  I'm not aware of anything in that regard but I wouldn't really know.  Syed is working on researching deferrable operators to start making the full suite of AWS operators deferrable though, so out of our team I'd say he's the closest to an expert on aiobotocore and related stuff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1122799083


##########
.github/workflows/ci.yml:
##########
@@ -801,6 +801,30 @@ jobs:
         run: breeze ci fix-ownership
         if: always()
 
+  tests-aws-async-provider:
+    timeout-minutes: 50
+    name: "Pytest for AWS Async Provider"
+    runs-on: "${{needs.build-info.outputs.runs-on}}"
+    needs: [build-info, wait-for-ci-images]
+    if: needs.build-info.outputs.run-tests == 'true'
+    steps:
+      - name: Cleanup repo
+        shell: bash
+        run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
+      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+        uses: actions/checkout@v3
+        with:
+          persist-credentials: false
+      - name: "Prepare breeze & CI image"
+        uses: ./.github/actions/prepare_breeze_and_image
+      - name: "Run AWS Async Test"
+        run: "breeze shell \
+        'pip install aiobotocore>=2.1.1 && pytest /opt/airflow/tests/providers/amazon/aws/deferrable'"

Review Comment:
   I guess when this PR would be ready for merge it would be solved?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1471982103

   Yep. Looks better:
   
   <img width="739" alt="Screenshot 2023-03-16 at 14 33 45" src="https://user-images.githubusercontent.com/595491/225633395-0460e2eb-125a-42d8-96df-f99938ac8e9e.png">
   
   Also as expected botocore/boto were downgraded in the process automatically in constraints:
   
   https://github.com/apache/airflow/actions/runs/4436088571/jobs/7784082644?pr=30127
   
   <img width="878" alt="Screenshot 2023-03-16 at 14 35 16" src="https://user-images.githubusercontent.com/595491/225633758-826b0a99-a001-4953-a47a-e99a1ea9271b.png">
   
   As unfortunate as it is, seems that currently we do not have any AWS providers that rely on features unavailable in this version and all tests pass, so having it in constraints as "reference" one is not a big issue.
   
   The reall effect of doing it this way is simply slower pace of upgrading of the botocore/boto in our tests. Which is not as bad actually. 
   
   I thought about it and we could also make another change after https://github.com/apache/airflow/pull/30127 is merged (@Taragolis @kaxil @o-nikolas ): We could add a single extra job (a little more complex than the one initially added by @pankajastro ) where we could start the image,  remove aiobotocore, upgrade botocore and boto to latest compatible versions and run all amazon provider unit tests there (because of lack of aiobotocore, the tests for deferred operators would be skipped automatically)
   
   This should be rather simiple to set-up and it would give us much more confidence that the operators we have not only work with the aiobotocore-compatible version but also with the "latest" version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1067135598


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):

Review Comment:
   Is it necessarily base this class on AwsBaseHook? Seems like it do not actually use any methods and properties of it.



##########
airflow/providers/amazon/aws/hooks/redshift_cluster.py:
##########
@@ -183,3 +187,86 @@ def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifi
             return snapshot_status
         except self.get_conn().exceptions.ClusterSnapshotNotFoundFault:
             return None
+
+
+class RedshiftHookAsync(AwsBaseHookAsync):
+    """Interact with AWS Redshift using aiobotocore library"""
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        kwargs["client_type"] = "redshift"
+        kwargs["resource_type"] = "redshift"

Review Comment:
   This is redundant because `resource_type` it only could be created by `boto3`.
   And also `redshift` is not listed as a valid resource in `boto3`
   
   ```python
   Python 3.9.10 (main, Feb 25 2022, 16:54:01) 
   Type 'copyright', 'credits' or 'license' for more information
   IPython 8.7.0 -- An enhanced Interactive Python. Type '?' for help.
   PyDev console: using IPython 8.7.0
   Python 3.9.10 (main, Feb 25 2022, 16:54:01) 
   [Clang 13.0.0 (clang-1300.0.29.30)] on darwin
   
   import boto3
   boto3.__version__
   Out[3]: '1.26.43'
   
   session = boto3.session(region_name="us-east-1")  # All features become first in N.Virginia
   session.get_available_resources()
   
   Out[6]: 
   ['cloudformation',
    'cloudwatch',
    'dynamodb',
    'ec2',
    'glacier',
    'iam',
    'opsworks',
    's3',
    'sns',
    'sqs']
   ```



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
+
+        conn_config = AwsConnectionWrapper(
+            conn=connection_object,
+            region_name=self.region_name,
+            botocore_config=self.config,
+            verify=self.verify,
+        )
+
+        async_connection = get_session()
+        session_token = conn_config.aws_session_token
+        aws_secret = conn_config.aws_secret_access_key
+        aws_access = conn_config.aws_access_key_id
+        if conn_config.role_arn:
+            credentials = await self.get_role_credentials(
+                async_session=async_connection, conn_config=conn_config
+            )
+            if credentials:
+                session_token = credentials["SessionToken"]
+                aws_access = credentials["AccessKeyId"]
+                aws_secret = credentials["SecretAccessKey"]
+        return async_connection.create_client(
+            service_name=self.client_type,
+            region_name=conn_config.region_name,
+            aws_secret_access_key=aws_secret,
+            aws_access_key_id=aws_access,
+            aws_session_token=session_token,
+            verify=self.verify,
+            config=self.config,
+            endpoint_url=conn_config.endpoint_url,
+        )
+
+    @staticmethod
+    async def get_role_credentials(
+        async_session: AioSession, conn_config: AwsConnectionWrapper
+    ) -> Optional[Dict[str, str]]:
+        """Get the role_arn, method credentials from connection details and get the role credentials detail"""
+        async with async_session.create_client(
+            "sts",
+            aws_access_key_id=conn_config.aws_access_key_id,
+            aws_secret_access_key=conn_config.aws_secret_access_key,
+        ) as client:

Review Comment:
   What if user provide `profile_name` or `aws_session_token`?



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
+

Review Comment:
   This is inconsistent with `AwsBaseHook`. 
   1. If user provide None, then default `boto3` (to be honest `botocore`) credentials strategy - (env vars, EC2/ECS Metadata and etc.)
   2. If user provide invalid `aws_conn_id` then it fallback to the default `boto3` credentials strategy
   
   https://github.com/apache/airflow/blob/352d492c66e69e816fb1547e46fc1e3b7ba32066/airflow/providers/amazon/aws/hooks/base_aws.py#L507-L526



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
+
+        conn_config = AwsConnectionWrapper(
+            conn=connection_object,
+            region_name=self.region_name,
+            botocore_config=self.config,
+            verify=self.verify,
+        )
+
+        async_connection = get_session()

Review Comment:
   I think we could use `AioSession()` instead and even provide some useful stuff here.
   Or create `AsyncBaseSessionFactory` class which care about everything like `BaseSessionFactory`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1122798179


##########
.github/workflows/ci.yml:
##########
@@ -801,6 +801,30 @@ jobs:
         run: breeze ci fix-ownership
         if: always()
 
+  tests-aws-async-provider:
+    timeout-minutes: 50
+    name: "Pytest for AWS Async Provider"
+    runs-on: "${{needs.build-info.outputs.runs-on}}"
+    needs: [build-info, wait-for-ci-images]
+    if: needs.build-info.outputs.run-tests == 'true'

Review Comment:
   I think we should we run only on Amazon changes, I guess @potiuk might suggest as original autor of selective check for providers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1122787484


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   I have excluded the test from the regular provider CI job and added a ci. job to run aiobotocore-related tests. can you please re-review it. thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1068349671


##########
airflow/providers/amazon/aws/triggers/redshift_cluster.py:
##########
@@ -0,0 +1,51 @@
+from typing import AsyncIterator, Any, Tuple, Dict
+
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHookAsync
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class RedshiftClusterTrigger(BaseTrigger):
+    def __init__(
+        self,
+        task_id: str,
+        aws_conn_id: str,
+        cluster_identifier: str,
+        operation_type: str,
+        poll_interval: float = 5.0,
+    ):
+        super().__init__()
+        self.task_id = task_id
+        self.poll_interval = poll_interval
+        self.aws_conn_id = aws_conn_id
+        self.cluster_identifier = cluster_identifier
+        self.operation_type = operation_type
+
+    def serialize(self) -> Tuple[str, Dict[str, Any]]:
+        return (
+            "airflow.providers.amazon.aws.triggers.redshift_cluster.RedshiftClusterTrigger",
+            {
+                "task_id": self.task_id,
+                "poll_interval": self.poll_interval,
+                "aws_conn_id": self.aws_conn_id,
+                "cluster_identifier": self.cluster_identifier,
+                "operation_type": self.operation_type,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:
+        hook = RedshiftHookAsync(aws_conn_id=self.aws_conn_id)
+        try:
+            if self.operation_type == "pause_cluster":

Review Comment:
   Should we throw an exception if `operation_type` is not "pause_cluster"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1095915111


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   @ferruzzi I don't think we can move it to core dependency even if it resolve now, there is about 3-5 new version of `botocore` per week and in opposite `aiobotocore` usually [bump botocore dependency](https://github.com/aio-libs/aiobotocore/commits/master/setup.py) once per 3-6 months.
   
   Unfortunetly nothing we could do better until boto-team release next major version of botocore with asyncio support: https://github.com/boto/botocore/issues/458
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1119258230


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -458,11 +460,15 @@ def __init__(
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        deferrable: bool = False,
+        poll_interval: int = 10,

Review Comment:
   > Are any plans for migrate other AWS Operators to use waiters?
   
   Personally, I think I'd like to see all operators which implement a `wait_for_completion` block to use a waiter (either an official one or a custom one) but it's a matter of time.  I am working on moving the EMR operators over as I get time but it's not a huge priority (yet?) and maybe it's better suited to a community project with a discussion to track the progress like we've done in the past?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1442883280

   Requesting re-review on this. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1467834268

   Hmm. Though I see (and remember now) there is this big issue with aiobotocore pinning botocore to specific version https://github.com/apache/airflow/pull/30032#discussion_r1134674392 , so this is going to be more difficult. Stay tuned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1095919359


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   but @Taragolis we can move it out from the core dependency and if someone wants to use the async operator then they will have to install the `aiobotocore`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1093697593


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   hmm, how we can avoid this conflict? 
   should we keep this optional and move related imports to blocks where it getting used 🤷‍♂️ ?
   once we will conclude on this I'll address the other comments  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1122804707


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -458,11 +460,15 @@ def __init__(
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        deferrable: bool = False,
+        poll_interval: int = 10,

Review Comment:
   Yeah I think we do not have native support soon.
   Just hope that point for some inside 🤣 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1112975396


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
+
+        conn_config = AwsConnectionWrapper(
+            conn=connection_object,
+            region_name=self.region_name,
+            botocore_config=self.config,
+            verify=self.verify,
+        )
+
+        async_connection = get_session()

Review Comment:
   I have created a basic AsyncBaseSessionFactory, which currently handles basic auth with airflow conn credential, using env credential, and assume_role method we can add others in future 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1119259388


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -458,11 +460,15 @@ def __init__(
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        deferrable: bool = False,
+        poll_interval: int = 10,

Review Comment:
   For the first part, I don't know offhand but @syedahsn is working on deferrable stuff on our end and might have a better answer there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1068343757


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):

Review Comment:
   If you rename this class, please do not forget to update the comments as well. This class is reference multiple times in comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1094124816


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   @kaxil @uranusjr @ashb  any thoughts on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
phanikumv commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1066810753


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):

Review Comment:
   Dont think CI entity naming check would allow the class name as `AwsBaseHookAsync` - rename it to `AwsBaseAsyncHook`



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):

Review Comment:
   ```suggestion
   class AwsBaseAsyncHook(AwsBaseHook):
   ```



##########
airflow/providers/amazon/aws/hooks/redshift_cluster.py:
##########
@@ -183,3 +187,86 @@ def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifi
             return snapshot_status
         except self.get_conn().exceptions.ClusterSnapshotNotFoundFault:
             return None
+
+
+class RedshiftHookAsync(AwsBaseHookAsync):

Review Comment:
   ```suggestion
   class RedshiftAsyncHook(AwsBaseHookAsync):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1066843322


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   I think add this as core dependency for amazon-provider could broke some existed functionality.
   At least until `aiobotocore` resolve this issue https://github.com/aio-libs/aiobotocore/issues/976 and remove upper bound of [`botocore`](https://github.com/aio-libs/aiobotocore/blob/14b2f5fe0033a1f4bbc456f10633c2aeea74f57a/setup.py#L10).
   
   Because `botocore` is a main package for `boto3` and if we can't upgrade both of them we can't add new features which introduced since `botocore` 1.28.0+
   
   Changelogs: 
   - [botocore](https://github.com/boto/botocore/blob/develop/CHANGELOG.rst)
   - [boto3](https://github.com/boto/boto3/blob/develop/CHANGELOG.rst)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1446680989

   Hi @Taragolis can you please re-review it, thank you! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1112816670


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
+

Review Comment:
   yeah, fixed the behaviour



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1112978007


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
+
+        conn_config = AwsConnectionWrapper(
+            conn=connection_object,
+            region_name=self.region_name,
+            botocore_config=self.config,
+            verify=self.verify,
+        )
+
+        async_connection = get_session()
+        session_token = conn_config.aws_session_token
+        aws_secret = conn_config.aws_secret_access_key
+        aws_access = conn_config.aws_access_key_id
+        if conn_config.role_arn:
+            credentials = await self.get_role_credentials(
+                async_session=async_connection, conn_config=conn_config
+            )
+            if credentials:
+                session_token = credentials["SessionToken"]
+                aws_access = credentials["AccessKeyId"]
+                aws_secret = credentials["SecretAccessKey"]
+        return async_connection.create_client(
+            service_name=self.client_type,
+            region_name=conn_config.region_name,
+            aws_secret_access_key=aws_secret,
+            aws_access_key_id=aws_access,
+            aws_session_token=session_token,
+            verify=self.verify,
+            config=self.config,
+            endpoint_url=conn_config.endpoint_url,
+        )
+
+    @staticmethod
+    async def get_role_credentials(
+        async_session: AioSession, conn_config: AwsConnectionWrapper
+    ) -> Optional[Dict[str, str]]:
+        """Get the role_arn, method credentials from connection details and get the role credentials detail"""
+        async with async_session.create_client(
+            "sts",
+            aws_access_key_id=conn_config.aws_access_key_id,
+            aws_secret_access_key=conn_config.aws_secret_access_key,
+        ) as client:

Review Comment:
   I have fixed this, please take a look. setting these params in aiosession and from the airflow side it same config as for sync hook 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1126950222


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -458,11 +460,15 @@ def __init__(
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        deferrable: bool = False,
+        poll_interval: int = 10,

Review Comment:
   Our team is currently working on implementing deferrable operators as well, and we're taking a slightly different approach. Rather than create async counterparts for all AWS hooks, we are planning to create an `async_conn` property similar to the existing `conn` property. The `async_conn` property will return a `ClientCreatorContext` object that can be used to create an aiobotocore client that can make async calls to the boto3 API. With this approach, we reduce a lot of complexity and code duplication by not having separate async hooks for every service. Additionally, we are focusing on using waiters to perform the polling operations so that most of the triggers will be in a standard format. The aiobotocore library has an AIOWaiter class that make asynchronous calls to the boto3 API which makes it very easy to implement most waiters. This also saves us from having to write custom describe calls for every service. We are currently reviewing the design, but will have a PR for the co
 mmunity shortly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1467791465

   > > Merging this but will Jarek take the call on CI -- I don't fully like/agree with the CI steps but don't want to block this futher
   > 
   > Yeah. I completely missed the calling (when I was called 2 weeks ago) - sorry for that.
   > 
   > @pankajastro - can you please make a follow-up after this one (happy to halp to review and get it done).
   > 
   > I believe the right way to solve it will be to:
   > 
   > * add aiobotocore to setup.py in this list (with the comment explaining in both - amazon provider.yaml and in the list, that the  two shoudl be synchronized. We
   > 
   > ```
   > # Dependencies needed for development only
   > devel_only = [
   > ```
   > 
   > (unless it causes some conflicts, and then we can think what to do)
   > 
   > * remove the dedicated job to run the tests (and check that in "Providers" section the deferrable tests are executed
   > 
   > Then it should work out-of-the-box and only when providers are modified.
   > 
   > @pankajastro - can you make a follow-up PR for that please?
   
   Thank you @potiuk for the feedback. I file a PR for this soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1098349606


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   @potiuk WDYT ^^?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1122772845


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -865,3 +868,137 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+try:
+    import aiobotocore.credentials
+    from aiobotocore.session import AioSession, get_session
+except ImportError:
+    pass
+
+
+class BaseAsyncSessionFactory(BaseSessionFactory):
+    """
+    Base AWS Session Factory class to handle aiobotocore session creation.
+
+    It currently, handles ENV, AWS secret key and STS client method ``assume_role``
+    provided in Airflow connection
+    """
+
+    async def get_role_credentials(self) -> dict:
+        """Get the role_arn, method credentials from connection details and get the role credentials detail"""
+        async with self._basic_session.create_client("sts", region_name=self.region_name) as client:
+            response = await client.assume_role(
+                RoleArn=self.role_arn,
+                RoleSessionName=self._strip_invalid_session_name_characters(f"Airflow_{self.conn.conn_id}"),
+                **self.conn.assume_role_kwargs,
+            )
+            return response["Credentials"]
+
+    async def _get_refresh_credentials(self) -> dict[str, Any]:
+        self.log.debug("Refreshing credentials")
+        assume_role_method = self.conn.assume_role_method
+        if assume_role_method != "assume_role":
+            raise NotImplementedError(f"assume_role_method={assume_role_method} not expected")
+
+        credentials = await self.get_role_credentials()
+
+        expiry_time = credentials["Expiration"].isoformat()
+        self.log.debug("New credentials expiry_time: %s", expiry_time)
+        credentials = {
+            "access_key": credentials.get("AccessKeyId"),
+            "secret_key": credentials.get("SecretAccessKey"),
+            "token": credentials.get("SessionToken"),
+            "expiry_time": expiry_time,
+        }
+        return credentials
+
+    def _get_session_with_assume_role(self) -> AioSession:
+
+        assume_role_method = self.conn.assume_role_method
+        if assume_role_method != "assume_role":
+            raise NotImplementedError(f"assume_role_method={assume_role_method} not expected")
+
+        credentials = aiobotocore.credentials.AioRefreshableCredentials.create_from_metadata(
+            metadata=self._get_refresh_credentials(),
+            refresh_using=self._get_refresh_credentials,
+            method="sts-assume-role",
+        )
+
+        session = aiobotocore.session.get_session()
+        session._credentials = credentials
+        return session
+
+    @cached_property
+    def _basic_session(self) -> AioSession:
+        """Cached property with basic aiobotocore.session.AioSession."""
+        session_kwargs = self.conn.session_kwargs
+        aws_access_key_id = session_kwargs.get("aws_access_key_id")
+        aws_secret_access_key = session_kwargs.get("aws_secret_access_key")
+        aws_session_token = session_kwargs.get("aws_session_token")
+        region_name = session_kwargs.get("region_name")
+        profile_name = session_kwargs.get("profile_name")
+
+        aio_session = get_session()
+        if profile_name is not None:
+            aio_session.set_config_variable("profile", profile_name)
+        if aws_access_key_id or aws_secret_access_key or aws_session_token:
+            aio_session.set_credentials(
+                access_key=aws_access_key_id,
+                secret_key=aws_secret_access_key,
+                token=aws_session_token,
+            )
+        if region_name is not None:
+            aio_session.set_config_variable("region", region_name)
+        return aio_session
+
+    def create_session(self) -> AioSession:
+        """Create aiobotocore Session from connection and config."""
+        if not self._conn:
+            self.log.info("No connection ID provided. Fallback on boto3 credential strategy")
+            return get_session()
+        elif not self.role_arn:
+            return self._basic_session
+        return self._get_session_with_assume_role()
+
+
+class AwsBaseAsyncHook(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default botocore behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default botocore configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    """
+
+    def __init__(self, **kwargs) -> None:
+        try:
+            pass
+        except ImportError:
+            raise AirflowOptionalProviderFeatureException(
+                "AWS deferrable operator feature is disable. To enable it please install aiobotocore>=2.1.1"
+            )

Review Comment:
   I wanted to fail in dag parsing only if aiobotocore is not installed but look like precommit replace import with pass



##########
airflow/providers/amazon/provider.yaml:
##########
@@ -596,3 +597,4 @@ additional-extras:
   - name: pandas
     dependencies:
       - pandas>=0.17.1
+      - aiobotocore>=2.1.1

Review Comment:
   yup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1122757853


##########
.github/workflows/ci.yml:
##########
@@ -801,6 +801,30 @@ jobs:
         run: breeze ci fix-ownership
         if: always()
 
+  tests-aws-async-provider:
+    timeout-minutes: 50
+    name: "Pytest for AWS Async Provider"
+    runs-on: "${{needs.build-info.outputs.runs-on}}"
+    needs: [build-info, wait-for-ci-images]
+    if: needs.build-info.outputs.run-tests == 'true'

Review Comment:
   yes, what we should do here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1116721477


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -865,3 +868,137 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+try:
+    import aiobotocore.credentials
+    from aiobotocore.session import AioSession, get_session
+except ImportError:
+    pass
+
+
+class BaseAsyncSessionFactory(BaseSessionFactory):
+    """
+    Base AWS Session Factory class to handle aiobotocore session creation.
+
+    It currently, handles ENV, AWS secret key and STS client method ``assume_role``
+    provided in Airflow connection
+    """
+
+    async def get_role_credentials(self) -> dict:
+        """Get the role_arn, method credentials from connection details and get the role credentials detail"""
+        async with self._basic_session.create_client("sts", region_name=self.region_name) as client:
+            response = await client.assume_role(
+                RoleArn=self.role_arn,
+                RoleSessionName=self._strip_invalid_session_name_characters(f"Airflow_{self.conn.conn_id}"),
+                **self.conn.assume_role_kwargs,
+            )
+            return response["Credentials"]
+
+    async def _get_refresh_credentials(self) -> dict[str, Any]:
+        self.log.debug("Refreshing credentials")
+        assume_role_method = self.conn.assume_role_method
+        if assume_role_method != "assume_role":
+            raise NotImplementedError(f"assume_role_method={assume_role_method} not expected")
+
+        credentials = await self.get_role_credentials()
+
+        expiry_time = credentials["Expiration"].isoformat()
+        self.log.debug("New credentials expiry_time: %s", expiry_time)
+        credentials = {
+            "access_key": credentials.get("AccessKeyId"),
+            "secret_key": credentials.get("SecretAccessKey"),
+            "token": credentials.get("SessionToken"),
+            "expiry_time": expiry_time,
+        }
+        return credentials
+
+    def _get_session_with_assume_role(self) -> AioSession:
+
+        assume_role_method = self.conn.assume_role_method
+        if assume_role_method != "assume_role":
+            raise NotImplementedError(f"assume_role_method={assume_role_method} not expected")
+
+        credentials = aiobotocore.credentials.AioRefreshableCredentials.create_from_metadata(
+            metadata=self._get_refresh_credentials(),
+            refresh_using=self._get_refresh_credentials,
+            method="sts-assume-role",
+        )
+
+        session = aiobotocore.session.get_session()
+        session._credentials = credentials
+        return session
+
+    @cached_property
+    def _basic_session(self) -> AioSession:
+        """Cached property with basic aiobotocore.session.AioSession."""
+        session_kwargs = self.conn.session_kwargs
+        aws_access_key_id = session_kwargs.get("aws_access_key_id")
+        aws_secret_access_key = session_kwargs.get("aws_secret_access_key")
+        aws_session_token = session_kwargs.get("aws_session_token")
+        region_name = session_kwargs.get("region_name")
+        profile_name = session_kwargs.get("profile_name")
+
+        aio_session = get_session()
+        if profile_name is not None:
+            aio_session.set_config_variable("profile", profile_name)
+        if aws_access_key_id or aws_secret_access_key or aws_session_token:
+            aio_session.set_credentials(
+                access_key=aws_access_key_id,
+                secret_key=aws_secret_access_key,
+                token=aws_session_token,
+            )
+        if region_name is not None:
+            aio_session.set_config_variable("region", region_name)
+        return aio_session
+
+    def create_session(self) -> AioSession:
+        """Create aiobotocore Session from connection and config."""
+        if not self._conn:
+            self.log.info("No connection ID provided. Fallback on boto3 credential strategy")
+            return get_session()
+        elif not self.role_arn:
+            return self._basic_session
+        return self._get_session_with_assume_role()
+
+
+class AwsBaseAsyncHook(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default botocore behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default botocore configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    """
+
+    def __init__(self, **kwargs) -> None:
+        try:
+            pass
+        except ImportError:
+            raise AirflowOptionalProviderFeatureException(
+                "AWS deferrable operator feature is disable. To enable it please install aiobotocore>=2.1.1"
+            )

Review Comment:
   Unfortunetly this exception never happen



##########
.github/workflows/ci.yml:
##########
@@ -801,6 +801,30 @@ jobs:
         run: breeze ci fix-ownership
         if: always()
 
+  tests-aws-async-provider:
+    timeout-minutes: 50
+    name: "Pytest for AWS Async Provider"
+    runs-on: "${{needs.build-info.outputs.runs-on}}"
+    needs: [build-info, wait-for-ci-images]
+    if: needs.build-info.outputs.run-tests == 'true'
+    steps:
+      - name: Cleanup repo
+        shell: bash
+        run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
+      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+        uses: actions/checkout@v3
+        with:
+          persist-credentials: false
+      - name: "Prepare breeze & CI image"
+        uses: ./.github/actions/prepare_breeze_and_image
+      - name: "Run AWS Async Test"
+        run: "breeze shell \
+        'pip install aiobotocore>=2.1.1 && pytest /opt/airflow/tests/providers/amazon/aws/deferrable'"

Review Comment:
   All other settings would not apply for this run, include collect warnings.
   I do not know is it required right now or not



##########
.github/workflows/ci.yml:
##########
@@ -801,6 +801,30 @@ jobs:
         run: breeze ci fix-ownership
         if: always()
 
+  tests-aws-async-provider:
+    timeout-minutes: 50
+    name: "Pytest for AWS Async Provider"
+    runs-on: "${{needs.build-info.outputs.runs-on}}"
+    needs: [build-info, wait-for-ci-images]
+    if: needs.build-info.outputs.run-tests == 'true'

Review Comment:
   I guess it will run even if no changes in AWS Provider



##########
airflow/providers/amazon/provider.yaml:
##########
@@ -596,3 +597,4 @@ additional-extras:
   - name: pandas
     dependencies:
       - pandas>=0.17.1
+      - aiobotocore>=2.1.1

Review Comment:
   I guess it was add by mistake into "pandas" extra



##########
airflow/providers/amazon/provider.yaml:
##########
@@ -596,3 +597,4 @@ additional-extras:
   - name: pandas
     dependencies:
       - pandas>=0.17.1
+      - aiobotocore>=2.1.1

Review Comment:
   In additional if user post-install this dependency then step should be
   
   1. remove `boto3` and `botocore`
   2. Install `aiobotocore` with appropriate version of `boto3` and `botocore`
   
   Otherwise this error/warning happen
   
   ```console
   ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
   boto3 1.26.79 requires botocore<1.30.0,>=1.29.79, but you have botocore 1.27.59 which is incompatible.
   ```



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -458,11 +460,15 @@ def __init__(
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        deferrable: bool = False,
+        poll_interval: int = 10,

Review Comment:
   Just wondering how we would resolve in case if we want use more generic parameters `wait_for_completion` as well as `waiter_delay` and `waiter_max_attempts`.
   
   @ferruzzi @vincbeck Are any plans for migrate other AWS Operators to use waiters?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1471867961

   > I got a closer look at the problem and I think doing it the way I described in [#28850 (comment)](https://github.com/apache/airflow/pull/28850#issuecomment-1467791465) might be the right approach - let's see if others agree with it.
   
   Hi @potiuk I have cleanup up the CI part as suggested https://github.com/apache/airflow/pull/30127 Now looks much better. can you please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1095842584


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   Maybe add a comment linking to that issue and a note to change it to a dependency once that is sorted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1095879988


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   sure, thanks @ferruzzi @kaxil @phanikumv I'll address the other feeback soon. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1094475545


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   Make it an optional requirement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1112971388


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):

Review Comment:
   I have refactored it a bit both AwsBaseHook and AwsBaseAsyncHook and using some propeties 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1116653627


##########
tests/providers/amazon/aws/utils/compat.py:
##########
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+__all__ = ["async_mock", "AsyncMock"]
+
+import sys
+
+if sys.version_info < (3, 8):
+    # For compatibility with Python 3.7
+    from asynctest import mock as async_mock
+
+    # ``asynctest.mock.CoroutineMock`` which provide compatibility not working well with autospec=True
+    # as result "TypeError: object MagicMock can't be used in 'await' expression" could be raised.
+    # Best solution in this case provide as spec actual awaitable object
+    # >>> from tests.providers.google.cloud.utils.compat import AsyncMock

Review Comment:
   ```suggestion
       # >>> from tests.providers.amazon.aws.utils.compat import AsyncMock
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1122765848


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -596,3 +597,4 @@ additional-extras:
   - name: pandas
     dependencies:
       - pandas>=0.17.1
+      - aiobotocore>=2.1.1

Review Comment:
   do you mean in CI we should first remove botocore/boto3 and then install aiobototcore?  
   yes, the above warning/error will come but since I'm running a single command to install just one package aiobotocore it will install it. let me know if you want I can add step in CI to remove botocore/boto3 and then install the  aiobotocore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1121011816


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -458,11 +460,15 @@ def __init__(
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        deferrable: bool = False,
+        poll_interval: int = 10,

Review Comment:
   It's been requested for a looong time: https://github.com/boto/botocore/issues/458
   We reached out to them and they said Botocore will eventually support async by default, but it isn't coming any time soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1468033661

   I got a closer look at the problem and I think doing it the way I described in https://github.com/apache/airflow/pull/28850#issuecomment-1467791465 might be the right approach - let's see if others agree with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1467778448

   > Merging this but will Jarek take the call on CI -- I don't fully like/agree with the CI steps but don't want to block this futher
   
   Yeah. I completely missed the calling (when I was called 2 weeks ago) - sorry for that.
   
   @pankajastro - can you please make a follow-up after this one (happy to halp to review and get it done).
   
   I believe the right way to solve it will be to:
   
   * add aiobotocore to setup.py in this list (with the comment explaining in both - amazon provider.yaml and in the list, that the  two shoudl be synchronized. We 
   ```
   # Dependencies needed for development only
   devel_only = [
   ```
   (unless it causes some conflicts, and then we can think what to do) 
   
   * remove the dedicated job to run the tests (and check that in "Providers" section the deferrable tests are executed
   
   Then it should work out-of-the-box and only when providers are modified.
   
   @pankajastro - can you make a follow-up PR for that please?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1067325568


##########
tests/providers/amazon/aws/triggers/test_redshift_cluster.py:
##########
@@ -0,0 +1,89 @@
+import asyncio
+from unittest import mock
+
+import pytest
+from airflow.triggers.base import TriggerEvent
+
+from airflow.providers.amazon.aws.triggers.redshift_cluster import (
+    RedshiftClusterTrigger,
+)
+
+TASK_ID = "redshift_trigger_check"
+POLLING_PERIOD_SECONDS = 1.0
+
+
+class TestRedshiftClusterTrigger:
+
+    def test_pause_serialization(self):
+        """
+        Asserts that the RedshiftClusterTrigger correctly serializes its arguments
+        and classpath.
+        """
+        trigger = RedshiftClusterTrigger(
+            task_id=TASK_ID,
+            poll_interval=POLLING_PERIOD_SECONDS,
+            aws_conn_id="test_redshift_conn_id",
+            cluster_identifier="mock_cluster_identifier",
+            operation_type="pause_cluster",
+        )
+        classpath, kwargs = trigger.serialize()
+        assert classpath == "airflow.providers.amazon.aws.triggers.redshift_cluster.RedshiftClusterTrigger"
+        assert kwargs == {
+            "task_id": TASK_ID,
+            "poll_interval": POLLING_PERIOD_SECONDS,
+            "aws_conn_id": "test_redshift_conn_id",
+            "cluster_identifier": "mock_cluster_identifier",
+            "operation_type": "pause_cluster",
+        }
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "operation_type,return_value,response",
+        [
+            (
+                "pause_cluster",
+                {"status": "error", "message": "test error"},
+                TriggerEvent({"status": "error", "message": "test error"}),
+            ),
+            (
+                "pause_cluster",
+                {"status": "success", "cluster_state": "paused"},
+                TriggerEvent({"status": "success", "cluster_state": "paused"}),
+            ),
+            ("pause_cluster", False, TriggerEvent({"status": "error", "message": f"{TASK_ID} failed"})),
+        ],
+    )
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.pause_cluster")
+    async def test_pause_trigger_run(
+        self, mock_pause_cluster, operation_type, return_value, response
+    ):
+        """
+        Test trigger event for the pause_cluster response
+        """
+        mock_pause_cluster.return_value = return_value
+        trigger = RedshiftClusterTrigger(
+            task_id=TASK_ID,
+            poll_interval=POLLING_PERIOD_SECONDS,
+            aws_conn_id="test_redshift_conn_id",
+            cluster_identifier="mock_cluster_identifier",
+            operation_type=operation_type,
+        )
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert response == actual
+
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHookAsync.pause_cluster")

Review Comment:
   You can't use `unittests.mock.path` in Python 3.7 for asyncio tests because it is not implementes AsyncMagicMock and AsyncMock
   
   The way how it implemented in Google Provider tests:
   
   https://github.com/apache/airflow/blob/877189916900c930b2c4b92101d46d2f98eb9077/tests/providers/google/cloud/utils/compat.py#L18-L37



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1095934051


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   Yeah we could catch that module not installed and raise appropriate exception, for example
   
   https://github.com/apache/airflow/blob/a7e1cb2fbfc684508f4b832527ae2371f99ad37d/airflow/providers/slack/notifications/slack_notifier.py#L26-L31
   
   But also need somehow resolve additional issues:
   1. AFAIK in CI image only core providers dependencies installed (i guess we have some workaround). That mean we have an error in async Amazon providers tests because there is no `aiobotocore`
   2. If we install `aiobotocore` in CI image than we also freeze version of `botocore` and `boto3`. As result we could miss if our tests not work with modern version of `boto3` (we have some private method usage in Amazon Provider)
   
   I'm not sure how to resolve this, may be @potiuk could suggest something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1122760684


##########
.github/workflows/ci.yml:
##########
@@ -801,6 +801,30 @@ jobs:
         run: breeze ci fix-ownership
         if: always()
 
+  tests-aws-async-provider:
+    timeout-minutes: 50
+    name: "Pytest for AWS Async Provider"
+    runs-on: "${{needs.build-info.outputs.runs-on}}"
+    needs: [build-info, wait-for-ci-images]
+    if: needs.build-info.outputs.run-tests == 'true'
+    steps:
+      - name: Cleanup repo
+        shell: bash
+        run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
+      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+        uses: actions/checkout@v3
+        with:
+          persist-credentials: false
+      - name: "Prepare breeze & CI image"
+        uses: ./.github/actions/prepare_breeze_and_image
+      - name: "Run AWS Async Test"
+        run: "breeze shell \
+        'pip install aiobotocore>=2.1.1 && pytest /opt/airflow/tests/providers/amazon/aws/deferrable'"

Review Comment:
   right, but I'm also not sure if that require right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1119285931


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -458,11 +460,15 @@ def __init__(
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        deferrable: bool = False,
+        poll_interval: int = 10,

Review Comment:
   Any chance that`botocore` will have native asyncio support 🤔 Or it is just some kind of surprise 🤣 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1466875737

   Merging this but will Jarek take the call on CI -- I don't fully like/agree with the CI steps but don't want to block this futher


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil merged pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil merged PR #28850:
URL: https://github.com/apache/airflow/pull/28850


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1108517079


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   > 1. AFAIK in CI image only core providers dependencies installed (i guess we have some workaround). That mean we have an error in async Amazon providers tests because there is no aiobotocore
   
   Yes. It is quite a bummer that  aiobotocore limits botocore that much. Indeed if aiobotocore will be additional extra and will be missing in the image.  We could add it, but as @Taragolis mentions, it would limit our botocore upgrades that happen regularly. A solution to that might be:
   
   1) add exclusion in the tests when iobotocore is installed
   2) implement an extra job in CI that installs iobotocore and then runs only the iobotocore tests.
   
   Should be easy to do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1108506329


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   Sorry - I totally missed that. Yes we have full support for that:
   1) raise AirflowOptionalProviderFeatureException as mentioned above
   2) Add "additional-extra" in the amazon provider - same as pandas we already have there: https://github.com/apache/airflow/blob/main/airflow/providers/amazon/provider.yaml#LL342-L597C23



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #28850:
URL: https://github.com/apache/airflow/pull/28850#issuecomment-1472113460

   This one shoudl do the trick (once succeeds) https://github.com/apache/airflow/pull/30144 
   
   @pankajastro @Taragolis @kaxil @ferruzzi  @o-nikolas -> I think with additional tests on "latest" botocore/boto, we should be in a really good posistion, because all amazon-related PR will have to also pass non-deferrable tests for latest botocore/boto.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1112992742


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
+
+        conn_config = AwsConnectionWrapper(
+            conn=connection_object,
+            region_name=self.region_name,
+            botocore_config=self.config,
+            verify=self.verify,
+        )
+
+        async_connection = get_session()
+        session_token = conn_config.aws_session_token
+        aws_secret = conn_config.aws_secret_access_key
+        aws_access = conn_config.aws_access_key_id
+        if conn_config.role_arn:
+            credentials = await self.get_role_credentials(
+                async_session=async_connection, conn_config=conn_config
+            )
+            if credentials:
+                session_token = credentials["SessionToken"]
+                aws_access = credentials["AccessKeyId"]
+                aws_secret = credentials["SecretAccessKey"]
+        return async_connection.create_client(
+            service_name=self.client_type,
+            region_name=conn_config.region_name,
+            aws_secret_access_key=aws_secret,
+            aws_access_key_id=aws_access,
+            aws_session_token=session_token,
+            verify=self.verify,
+            config=self.config,
+            endpoint_url=conn_config.endpoint_url,
+        )
+
+    @staticmethod
+    async def get_role_credentials(
+        async_session: AioSession, conn_config: AwsConnectionWrapper
+    ) -> Optional[Dict[str, str]]:
+        """Get the role_arn, method credentials from connection details and get the role credentials detail"""
+        async with async_session.create_client(
+            "sts",
+            aws_access_key_id=conn_config.aws_access_key_id,
+            aws_secret_access_key=conn_config.aws_secret_access_key,
+        ) as client:
+            return_response = None
+            if conn_config.assume_role_method == "assume_role" or conn_config.assume_role_method is None:
+                response: Dict[str, Dict[str, str]] = await client.assume_role(
+                    RoleArn=conn_config.role_arn,
+                    RoleSessionName="RoleSession",
+                    **conn_config.assume_role_kwargs,
+                )
+                return_response = response["Credentials"]
+            return return_response

Review Comment:
   right, I have changed this interface similar to the sync session factory so that we can add the missing auth features with minimal change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org