You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/11 10:51:50 UTC

[GitHub] [airflow] Taragolis commented on a diff in pull request #28850: Add deferrable mode in RedshiftPauseClusterOperator

Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1066843322


##########
airflow/providers/amazon/provider.yaml:
##########
@@ -52,6 +52,7 @@ dependencies:
   - apache-airflow>=2.3.0
   - apache-airflow-providers-common-sql>=1.3.1
   - boto3>=1.24.0
+  - aiobotocore>=2.1.1

Review Comment:
   I think add this as core dependency for amazon-provider could broke some existed functionality.
   At least until `aiobotocore` resolve this issue https://github.com/aio-libs/aiobotocore/issues/976 and remove upper bound of [`botocore`](https://github.com/aio-libs/aiobotocore/blob/14b2f5fe0033a1f4bbc456f10633c2aeea74f57a/setup.py#L10).
   
   Because `botocore` is a main package for `boto3` and if we can't upgrade both of them we can add new features which introduced since `botocore` 1.28.0+



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
+
+        conn_config = AwsConnectionWrapper(
+            conn=connection_object,
+            region_name=self.region_name,
+            botocore_config=self.config,
+            verify=self.verify,
+        )
+
+        async_connection = get_session()
+        session_token = conn_config.aws_session_token
+        aws_secret = conn_config.aws_secret_access_key
+        aws_access = conn_config.aws_access_key_id
+        if conn_config.role_arn:
+            credentials = await self.get_role_credentials(
+                async_session=async_connection, conn_config=conn_config
+            )
+            if credentials:
+                session_token = credentials["SessionToken"]
+                aws_access = credentials["AccessKeyId"]
+                aws_secret = credentials["SecretAccessKey"]
+        return async_connection.create_client(
+            service_name=self.client_type,
+            region_name=conn_config.region_name,
+            aws_secret_access_key=aws_secret,
+            aws_access_key_id=aws_access,
+            aws_session_token=session_token,
+            verify=self.verify,
+            config=self.config,
+            endpoint_url=conn_config.endpoint_url,
+        )
+
+    @staticmethod
+    async def get_role_credentials(
+        async_session: AioSession, conn_config: AwsConnectionWrapper
+    ) -> Optional[Dict[str, str]]:
+        """Get the role_arn, method credentials from connection details and get the role credentials detail"""
+        async with async_session.create_client(
+            "sts",
+            aws_access_key_id=conn_config.aws_access_key_id,
+            aws_secret_access_key=conn_config.aws_secret_access_key,
+        ) as client:
+            return_response = None
+            if conn_config.assume_role_method == "assume_role" or conn_config.assume_role_method is None:
+                response: Dict[str, Dict[str, str]] = await client.assume_role(
+                    RoleArn=conn_config.role_arn,
+                    RoleSessionName="RoleSession",
+                    **conn_config.assume_role_kwargs,
+                )
+                return_response = response["Credentials"]
+            return return_response

Review Comment:
   By use this we lost ability for auto-renew expired credentials as well as all additional methods of obtain credentials which provided by AWS Connection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org