You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/07/15 18:04:34 UTC
[airflow] branch main updated: Migrate lambda sample dag to system test (AIP-47) (#24355)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f611b1feff Migrate lambda sample dag to system test (AIP-47) (#24355)
f611b1feff is described below
commit f611b1feffc3188cdc47fad55785fbd5ccbf8fdb
Author: Vincent <97...@users.noreply.github.com>
AuthorDate: Fri Jul 15 14:04:26 2022 -0400
Migrate lambda sample dag to system test (AIP-47) (#24355)
* Migrate lambda sample dag to system test
* Use SystemTestContextBuilder to pass along data
---
.../amazon/aws/example_dags/example_lambda.py | 45 --------
airflow/providers/amazon/aws/operators/s3.py | 1 -
.../operators/lambda.rst | 2 +-
.../system/providers/amazon/aws/example_lambda.py | 124 +++++++++++++++++++++
4 files changed, 125 insertions(+), 47 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_lambda.py b/airflow/providers/amazon/aws/example_dags/example_lambda.py
deleted file mode 100644
index 3b87c3aa3c..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_lambda.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import json
-from datetime import datetime, timedelta
-from os import getenv
-
-from airflow import DAG
-from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
-
-# [START howto_operator_lambda_env_variables]
-LAMBDA_FUNCTION_NAME = getenv("LAMBDA_FUNCTION_NAME", "test-function")
-# [END howto_operator_lambda_env_variables]
-
-SAMPLE_EVENT = json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}})
-
-with DAG(
- dag_id='example_lambda',
- schedule_interval=None,
- start_date=datetime(2021, 1, 1),
- dagrun_timeout=timedelta(minutes=60),
- tags=['example'],
- catchup=False,
-) as dag:
- # [START howto_operator_lambda]
- invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
- task_id='setup__invoke_lambda_function',
- function_name=LAMBDA_FUNCTION_NAME,
- payload=SAMPLE_EVENT,
- )
- # [END howto_operator_lambda]
diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py
index 7fbef6629c..af45eb9a4c 100644
--- a/airflow/providers/amazon/aws/operators/s3.py
+++ b/airflow/providers/amazon/aws/operators/s3.py
@@ -65,7 +65,6 @@ class S3CreateBucketOperator(BaseOperator):
self.bucket_name = bucket_name
self.region_name = region_name
self.aws_conn_id = aws_conn_id
- self.region_name = region_name
def execute(self, context: 'Context'):
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
diff --git a/docs/apache-airflow-providers-amazon/operators/lambda.rst b/docs/apache-airflow-providers-amazon/operators/lambda.rst
index e4516a170b..53bbeaec34 100644
--- a/docs/apache-airflow-providers-amazon/operators/lambda.rst
+++ b/docs/apache-airflow-providers-amazon/operators/lambda.rst
@@ -42,7 +42,7 @@ To invoke an AWS lambda function you can use
:class:`~airflow.providers.amazon.aws.operators.aws_lambda.AwsLambdaInvokeFunctionOperator`.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_lambda.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_lambda.py
:language: python
:dedent: 4
:start-after: [START howto_operator_lambda]
diff --git a/tests/system/providers/amazon/aws/example_lambda.py b/tests/system/providers/amazon/aws/example_lambda.py
new file mode 100644
index 0000000000..6944e7c0e0
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_lambda.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import io
+import json
+import zipfile
+from datetime import datetime
+
+import boto3
+
+from airflow import models
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+DAG_ID = 'example_lambda'
+
+# Externally fetched variables:
+ROLE_ARN_KEY = 'ROLE_ARN'
+
+sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
+
+CODE_CONTENT = """
+def test(*args):
+ print('Hello')
+"""
+
+
+# Create a zip file containing one file "lambda_function.py" to deploy to the lambda function
+def create_zip(content):
+ zip_output = io.BytesIO()
+ with zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) as zip_file:
+ info = zipfile.ZipInfo("lambda_function.py")
+ info.external_attr = 0o777 << 16
+ zip_file.writestr(info, content)
+ zip_output.seek(0)
+ return zip_output.read()
+
+
+@task
+def create_lambda(function_name, role_arn):
+ client = boto3.client('lambda')
+ client.create_function(
+ FunctionName=function_name,
+ Runtime='python3.9',
+ Role=role_arn,
+ Handler='lambda_function.test',
+ Code={
+ 'ZipFile': create_zip(CODE_CONTENT),
+ },
+ Description='Function used for system tests',
+ )
+
+
+@task
+def await_lambda(function_name):
+ client = boto3.client('lambda')
+ waiter = client.get_waiter('function_active_v2')
+ waiter.wait(FunctionName=function_name)
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_lambda(function_name):
+ client = boto3.client('lambda')
+ client.delete_function(
+ FunctionName=function_name,
+ )
+
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ tags=['example'],
+ catchup=False,
+) as dag:
+ test_context = sys_test_context_task()
+
+ lambda_function_name: str = f'{test_context[ENV_ID_KEY]}-function'
+
+ # [START howto_operator_lambda]
+ invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
+ task_id='invoke_lambda_function',
+ function_name=lambda_function_name,
+ payload=json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}),
+ )
+ # [END howto_operator_lambda]
+
+ chain(
+ # TEST SETUP
+ test_context,
+ create_lambda(lambda_function_name, test_context[ROLE_ARN_KEY]),
+ await_lambda(lambda_function_name),
+ # TEST BODY
+ invoke_lambda_function,
+ # TEST TEARDOWN
+ delete_lambda(lambda_function_name),
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)