You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/19 10:35:45 UTC

[airflow] branch main updated: Standardize AwsLambda (#25100)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d41067cf7 Standardize AwsLambda (#25100)
6d41067cf7 is described below

commit 6d41067cf7b0fcab20c00c94b6a96cb1babae1bc
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Tue Jul 19 13:35:35 2022 +0300

    Standardize AwsLambda (#25100)
    
    * Standardize AwsLambda
    The `aws_lambda.py` is not the standard file path. The `aws_` prefix is not needed.
    For the file name I used the same path as the hook.
    
    * fix deprecated_classes.py
    
    * update verify_providers.py
---
 .../providers/amazon/aws/operators/aws_lambda.py   | 90 ++--------------------
 .../{aws_lambda.py => lambda_function.py}          |  0
 airflow/providers/amazon/provider.yaml             |  1 +
 scripts/in_container/verify_providers.py           |  1 +
 tests/deprecated_classes.py                        |  4 +
 .../providers/amazon/aws/operators/test_lambda.py  |  2 +-
 .../system/providers/amazon/aws/example_lambda.py  |  2 +-
 7 files changed, 16 insertions(+), 84 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/aws_lambda.py b/airflow/providers/amazon/aws/operators/aws_lambda.py
index c2d9d022fb..f7c6ab710c 100644
--- a/airflow/providers/amazon/aws/operators/aws_lambda.py
+++ b/airflow/providers/amazon/aws/operators/aws_lambda.py
@@ -15,88 +15,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.lambda_function`."""
 
-import json
-from typing import TYPE_CHECKING, Optional, Sequence
+import warnings
 
-from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
+from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator  # noqa
 
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
-
-class AwsLambdaInvokeFunctionOperator(BaseOperator):
-    """
-    Invokes an AWS Lambda function.
-    You can invoke a function synchronously (and wait for the response),
-    or asynchronously.
-    To invoke a function asynchronously,
-    set `invocation_type` to `Event`. For more details,
-    review the boto3 Lambda invoke docs.
-
-    :param function_name: The name of the AWS Lambda function, version, or alias.
-    :param payload: The JSON string that you want to provide to your Lambda function as input.
-    :param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None".
-    :param qualifier: Specify a version or alias to invoke a published version of the function.
-    :param aws_conn_id: The AWS connection ID to use
-
-    .. seealso::
-        For more information on how to use this operator, take a look at the guide:
-        :ref:`howto/operator:AwsLambdaInvokeFunctionOperator`
-
-    """
-
-    template_fields: Sequence[str] = ('function_name', 'payload', 'qualifier', 'invocation_type')
-    ui_color = '#ff7300'
-
-    def __init__(
-        self,
-        *,
-        function_name: str,
-        log_type: Optional[str] = None,
-        qualifier: Optional[str] = None,
-        invocation_type: Optional[str] = None,
-        client_context: Optional[str] = None,
-        payload: Optional[str] = None,
-        aws_conn_id: str = 'aws_default',
-        **kwargs,
-    ):
-        super().__init__(**kwargs)
-        self.function_name = function_name
-        self.payload = payload
-        self.log_type = log_type
-        self.qualifier = qualifier
-        self.invocation_type = invocation_type
-        self.client_context = client_context
-        self.aws_conn_id = aws_conn_id
-
-    def execute(self, context: 'Context'):
-        """
-        Invokes the target AWS Lambda function from Airflow.
-
-        :return: The response payload from the function, or an error object.
-        """
-        hook = LambdaHook(aws_conn_id=self.aws_conn_id)
-        success_status_codes = [200, 202, 204]
-        self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.function_name, self.payload)
-        response = hook.invoke_lambda(
-            function_name=self.function_name,
-            invocation_type=self.invocation_type,
-            log_type=self.log_type,
-            client_context=self.client_context,
-            payload=self.payload,
-            qualifier=self.qualifier,
-        )
-        self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata"))
-        if response.get("StatusCode") not in success_status_codes:
-            raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata")))
-        payload_stream = response.get("Payload")
-        payload = payload_stream.read().decode()
-        if "FunctionError" in response:
-            raise ValueError(
-                'Lambda function execution resulted in error',
-                {"ResponseMetadata": response.get("ResponseMetadata"), "Payload": payload},
-            )
-        self.log.info('Lambda function invocation succeeded: %r', response.get("ResponseMetadata"))
-        return payload
+warnings.warn(
+    "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.lambda_function`.",
+    DeprecationWarning,
+    stacklevel=2,
+)
diff --git a/airflow/providers/amazon/aws/operators/aws_lambda.py b/airflow/providers/amazon/aws/operators/lambda_function.py
similarity index 100%
copy from airflow/providers/amazon/aws/operators/aws_lambda.py
copy to airflow/providers/amazon/aws/operators/lambda_function.py
diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml
index 451ca10206..29921cc2d2 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -288,6 +288,7 @@ operators:
   - integration-name: AWS Lambda
     python-modules:
       - airflow.providers.amazon.aws.operators.aws_lambda
+      - airflow.providers.amazon.aws.operators.lambda_function
   - integration-name: Amazon Simple Storage Service (S3)
     python-modules:
       - airflow.providers.amazon.aws.operators.s3_bucket
diff --git a/scripts/in_container/verify_providers.py b/scripts/in_container/verify_providers.py
index e3d0220178..e0b7468b8c 100755
--- a/scripts/in_container/verify_providers.py
+++ b/scripts/in_container/verify_providers.py
@@ -247,6 +247,7 @@ KNOWN_DEPRECATED_DIRECT_IMPORTS: Set[str] = {
     'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.redshift_cluster`.',
     "This module is deprecated. Please use airflow.providers.amazon.aws.transfers.sql_to_s3`.",
     "This module is deprecated. Please use `airflow.providers.tableau.sensors.tableau`.",
+    "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.lambda_function`.",
 }
 
 
diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py
index 522e62da72..5b76af905d 100644
--- a/tests/deprecated_classes.py
+++ b/tests/deprecated_classes.py
@@ -1131,6 +1131,10 @@ OPERATORS = [
         'airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator',
         'airflow.operators.s3_file_transform_operator.S3FileTransformOperator',
     ),
+    (
+        "airflow.providers.amazon.aws.operators.lambda_function.AwsLambdaInvokeFunctionOperator",
+        "airflow.providers.amazon.aws.operators.aws_lambda.AwsLambdaInvokeFunctionOperator",
+    ),
     (
         'airflow.providers.amazon.aws.operators.sagemaker.SageMakerBaseOperator',
         'airflow.providers.amazon.aws.operators.sagemaker_base.SageMakerBaseOperator',
diff --git a/tests/providers/amazon/aws/operators/test_lambda.py b/tests/providers/amazon/aws/operators/test_lambda.py
index b566f0cfb7..f3199eab7b 100644
--- a/tests/providers/amazon/aws/operators/test_lambda.py
+++ b/tests/providers/amazon/aws/operators/test_lambda.py
@@ -26,7 +26,7 @@ from moto import mock_iam, mock_lambda, mock_sts
 
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
-from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
+from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator
 
 
 @mock_lambda
diff --git a/tests/system/providers/amazon/aws/example_lambda.py b/tests/system/providers/amazon/aws/example_lambda.py
index 6944e7c0e0..a9e5e37b6c 100644
--- a/tests/system/providers/amazon/aws/example_lambda.py
+++ b/tests/system/providers/amazon/aws/example_lambda.py
@@ -24,7 +24,7 @@ import boto3
 from airflow import models
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
-from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
+from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder