You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/07/15 18:04:34 UTC

[airflow] branch main updated: Migrate lambda sample dag to system test (AIP-47) (#24355)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f611b1feff Migrate lambda sample dag to system test (AIP-47) (#24355)
f611b1feff is described below

commit f611b1feffc3188cdc47fad55785fbd5ccbf8fdb
Author: Vincent <97...@users.noreply.github.com>
AuthorDate: Fri Jul 15 14:04:26 2022 -0400

    Migrate lambda sample dag to system test (AIP-47) (#24355)
    
    * Migrate lambda sample dag to system test
    
    * Use SystemTestContextBuilder to pass along data
---
 .../amazon/aws/example_dags/example_lambda.py      |  45 --------
 airflow/providers/amazon/aws/operators/s3.py       |   1 -
 .../operators/lambda.rst                           |   2 +-
 .../system/providers/amazon/aws/example_lambda.py  | 124 +++++++++++++++++++++
 4 files changed, 125 insertions(+), 47 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_lambda.py b/airflow/providers/amazon/aws/example_dags/example_lambda.py
deleted file mode 100644
index 3b87c3aa3c..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_lambda.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import json
-from datetime import datetime, timedelta
-from os import getenv
-
-from airflow import DAG
-from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
-
-# [START howto_operator_lambda_env_variables]
-LAMBDA_FUNCTION_NAME = getenv("LAMBDA_FUNCTION_NAME", "test-function")
-# [END howto_operator_lambda_env_variables]
-
-SAMPLE_EVENT = json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}})
-
-with DAG(
-    dag_id='example_lambda',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    dagrun_timeout=timedelta(minutes=60),
-    tags=['example'],
-    catchup=False,
-) as dag:
-    # [START howto_operator_lambda]
-    invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
-        task_id='setup__invoke_lambda_function',
-        function_name=LAMBDA_FUNCTION_NAME,
-        payload=SAMPLE_EVENT,
-    )
-    # [END howto_operator_lambda]
diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py
index 7fbef6629c..af45eb9a4c 100644
--- a/airflow/providers/amazon/aws/operators/s3.py
+++ b/airflow/providers/amazon/aws/operators/s3.py
@@ -65,7 +65,6 @@ class S3CreateBucketOperator(BaseOperator):
         self.bucket_name = bucket_name
         self.region_name = region_name
         self.aws_conn_id = aws_conn_id
-        self.region_name = region_name
 
     def execute(self, context: 'Context'):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
diff --git a/docs/apache-airflow-providers-amazon/operators/lambda.rst b/docs/apache-airflow-providers-amazon/operators/lambda.rst
index e4516a170b..53bbeaec34 100644
--- a/docs/apache-airflow-providers-amazon/operators/lambda.rst
+++ b/docs/apache-airflow-providers-amazon/operators/lambda.rst
@@ -42,7 +42,7 @@ To invoke an AWS lambda function you can use
 :class:`~airflow.providers.amazon.aws.operators.aws_lambda.AwsLambdaInvokeFunctionOperator`.
 
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_lambda.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_lambda.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_lambda]
diff --git a/tests/system/providers/amazon/aws/example_lambda.py b/tests/system/providers/amazon/aws/example_lambda.py
new file mode 100644
index 0000000000..6944e7c0e0
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_lambda.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import io
+import json
+import zipfile
+from datetime import datetime
+
+import boto3
+
+from airflow import models
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+DAG_ID = 'example_lambda'
+
+# Externally fetched variables:
+ROLE_ARN_KEY = 'ROLE_ARN'
+
+sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
+
+CODE_CONTENT = """
+def test(*args):
+    print('Hello')
+"""
+
+
+# Create a zip file containing one file "lambda_function.py" to deploy to the lambda function
+def create_zip(content):
+    zip_output = io.BytesIO()
+    with zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) as zip_file:
+        info = zipfile.ZipInfo("lambda_function.py")
+        info.external_attr = 0o777 << 16
+        zip_file.writestr(info, content)
+    zip_output.seek(0)
+    return zip_output.read()
+
+
+@task
+def create_lambda(function_name, role_arn):
+    client = boto3.client('lambda')
+    client.create_function(
+        FunctionName=function_name,
+        Runtime='python3.9',
+        Role=role_arn,
+        Handler='lambda_function.test',
+        Code={
+            'ZipFile': create_zip(CODE_CONTENT),
+        },
+        Description='Function used for system tests',
+    )
+
+
+@task
+def await_lambda(function_name):
+    client = boto3.client('lambda')
+    waiter = client.get_waiter('function_active_v2')
+    waiter.wait(FunctionName=function_name)
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_lambda(function_name):
+    client = boto3.client('lambda')
+    client.delete_function(
+        FunctionName=function_name,
+    )
+
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+
+    lambda_function_name: str = f'{test_context[ENV_ID_KEY]}-function'
+
+    # [START howto_operator_lambda]
+    invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
+        task_id='invoke_lambda_function',
+        function_name=lambda_function_name,
+        payload=json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}),
+    )
+    # [END howto_operator_lambda]
+
+    chain(
+        # TEST SETUP
+        test_context,
+        create_lambda(lambda_function_name, test_context[ROLE_ARN_KEY]),
+        await_lambda(lambda_function_name),
+        # TEST BODY
+        invoke_lambda_function,
+        # TEST TEARDOWN
+        delete_lambda(lambda_function_name),
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)