You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/14 20:27:21 UTC
[airflow] branch main updated: Convert Cloudformation Sample DAG to System Test (#24447)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cfbcd31b69 Convert Cloudformation Sample DAG to System Test (#24447)
cfbcd31b69 is described below
commit cfbcd31b69bb3d3b3b2c950d0c530593769462d4
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Tue Jun 14 13:26:51 2022 -0700
Convert Cloudformation Sample DAG to System Test (#24447)
---
.../amazon/aws/sensors/cloud_formation.py | 5 +--
.../operators/cloudformation.rst | 8 ++--
.../amazon/aws}/example_cloudformation.py | 48 +++++++++++++++++-----
3 files changed, 43 insertions(+), 18 deletions(-)
diff --git a/airflow/providers/amazon/aws/sensors/cloud_formation.py b/airflow/providers/amazon/aws/sensors/cloud_formation.py
index 972dbace37..17d42099a7 100644
--- a/airflow/providers/amazon/aws/sensors/cloud_formation.py
+++ b/airflow/providers/amazon/aws/sensors/cloud_formation.py
@@ -39,7 +39,6 @@ class CloudFormationCreateStackSensor(BaseSensorOperator):
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/sensor:CloudFormationCreateStackSensor`
-
:param stack_name: The name of the stack to wait for (templated)
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
stored
@@ -65,7 +64,7 @@ class CloudFormationCreateStackSensor(BaseSensorOperator):
@cached_property
def hook(self) -> CloudFormationHook:
- """Create and return an CloudFormationHook"""
+ """Create and return a CloudFormationHook"""
return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
@@ -109,5 +108,5 @@ class CloudFormationDeleteStackSensor(BaseSensorOperator):
@cached_property
def hook(self) -> CloudFormationHook:
- """Create and return an CloudFormationHook"""
+ """Create and return a CloudFormationHook"""
return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
diff --git a/docs/apache-airflow-providers-amazon/operators/cloudformation.rst b/docs/apache-airflow-providers-amazon/operators/cloudformation.rst
index f83a2861ab..6b2730c348 100644
--- a/docs/apache-airflow-providers-amazon/operators/cloudformation.rst
+++ b/docs/apache-airflow-providers-amazon/operators/cloudformation.rst
@@ -42,7 +42,7 @@ Create an AWS CloudFormation stack
To create a new AWS CloudFormation stack use
:class:`~airflow.providers.amazon.aws.operators.cloud_formation.CloudFormationCreateStackOperator`.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_cloudformation.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudformation_create_stack]
@@ -56,7 +56,7 @@ Delete an AWS CloudFormation stack
To delete an AWS CloudFormation stack you can use
:class:`~airflow.providers.amazon.aws.operators.cloud_formation.CloudFormationDeleteStackOperator`.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_cloudformation.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudformation_delete_stack]
@@ -73,7 +73,7 @@ Wait on an AWS CloudFormation stack creation state
To wait on the state of an AWS CloudFormation stack creation until it reaches a terminal state you can use
:class:`~airflow.providers.amazon.aws.sensors.cloud_formation.CloudFormationCreateStackSensor`
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_cloudformation.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_cloudformation_create_stack]
@@ -87,7 +87,7 @@ Wait on an AWS CloudFormation stack deletion state
To wait on the state of an AWS CloudFormation stack deletion until it reaches a terminal state you can use
use :class:`~airflow.providers.amazon.aws.sensors.cloud_formation.CloudFormationDeleteStackSensor`
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_cloudformation.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_cloudformation_delete_stack]
diff --git a/airflow/providers/amazon/aws/example_dags/example_cloudformation.py b/tests/system/providers/amazon/aws/example_cloudformation.py
similarity index 68%
rename from airflow/providers/amazon/aws/example_dags/example_cloudformation.py
rename to tests/system/providers/amazon/aws/example_cloudformation.py
index c31d3215ff..5a17e79246 100644
--- a/airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+++ b/tests/system/providers/amazon/aws/example_cloudformation.py
@@ -14,8 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import json
from datetime import datetime
-from json import dumps
from airflow import DAG
from airflow.models.baseoperator import chain
@@ -27,10 +27,14 @@ from airflow.providers.amazon.aws.sensors.cloud_formation import (
CloudFormationCreateStackSensor,
CloudFormationDeleteStackSensor,
)
+from tests.system.providers.amazon.aws.utils import set_env_id
-CLOUDFORMATION_STACK_NAME = 'example-stack-name'
-# The CloudFormation template must have at least one resource to be usable, this example uses SQS
-# as a free and serverless option.
+ENV_ID = set_env_id()
+DAG_ID = 'example_cloudformation'
+
+CLOUDFORMATION_STACK_NAME = f'{ENV_ID}-stack'
+# The CloudFormation template must have at least one resource to
+# be usable, this example uses SQS as a free and serverless option.
CLOUDFORMATION_TEMPLATE = {
'Description': 'Stack from Airflow CloudFormation example DAG',
'Resources': {
@@ -41,14 +45,14 @@ CLOUDFORMATION_TEMPLATE = {
}
CLOUDFORMATION_CREATE_PARAMETERS = {
'StackName': CLOUDFORMATION_STACK_NAME,
- 'TemplateBody': dumps(CLOUDFORMATION_TEMPLATE),
+ 'TemplateBody': json.dumps(CLOUDFORMATION_TEMPLATE),
'TimeoutInMinutes': 2,
'OnFailure': 'DELETE', # Don't leave stacks behind if creation fails.
}
with DAG(
- dag_id='example_cloudformation',
- schedule_interval=None,
+ dag_id=DAG_ID,
+ schedule_interval='@once',
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
@@ -64,20 +68,42 @@ with DAG(
# [START howto_sensor_cloudformation_create_stack]
wait_for_stack_create = CloudFormationCreateStackSensor(
- task_id='wait_for_stack_creation', stack_name=CLOUDFORMATION_STACK_NAME
+ task_id='wait_for_stack_create',
+ stack_name=CLOUDFORMATION_STACK_NAME,
)
# [END howto_sensor_cloudformation_create_stack]
# [START howto_operator_cloudformation_delete_stack]
delete_stack = CloudFormationDeleteStackOperator(
- task_id='delete_stack', stack_name=CLOUDFORMATION_STACK_NAME
+ task_id='delete_stack',
+ trigger_rule='all_done',
+ stack_name=CLOUDFORMATION_STACK_NAME,
)
# [END howto_operator_cloudformation_delete_stack]
# [START howto_sensor_cloudformation_delete_stack]
wait_for_stack_delete = CloudFormationDeleteStackSensor(
- task_id='wait_for_stack_deletion', trigger_rule='all_done', stack_name=CLOUDFORMATION_STACK_NAME
+ task_id='wait_for_stack_delete',
+ trigger_rule='all_done',
+ stack_name=CLOUDFORMATION_STACK_NAME,
)
# [END howto_sensor_cloudformation_delete_stack]
- chain(create_stack, wait_for_stack_create, delete_stack, wait_for_stack_delete)
+ chain(
+ create_stack,
+ wait_for_stack_create,
+ delete_stack,
+ wait_for_stack_delete,
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)