You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/14 20:27:21 UTC

[airflow] branch main updated: Convert Cloudformation Sample DAG to System Test (#24447)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cfbcd31b69 Convert Cloudformation Sample DAG to System Test (#24447)
cfbcd31b69 is described below

commit cfbcd31b69bb3d3b3b2c950d0c530593769462d4
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Tue Jun 14 13:26:51 2022 -0700

    Convert Cloudformation Sample DAG to System Test (#24447)
---
 .../amazon/aws/sensors/cloud_formation.py          |  5 +--
 .../operators/cloudformation.rst                   |  8 ++--
 .../amazon/aws}/example_cloudformation.py          | 48 +++++++++++++++++-----
 3 files changed, 43 insertions(+), 18 deletions(-)

diff --git a/airflow/providers/amazon/aws/sensors/cloud_formation.py b/airflow/providers/amazon/aws/sensors/cloud_formation.py
index 972dbace37..17d42099a7 100644
--- a/airflow/providers/amazon/aws/sensors/cloud_formation.py
+++ b/airflow/providers/amazon/aws/sensors/cloud_formation.py
@@ -39,7 +39,6 @@ class CloudFormationCreateStackSensor(BaseSensorOperator):
         For more information on how to use this sensor, take a look at the guide:
         :ref:`howto/sensor:CloudFormationCreateStackSensor`
 
-
     :param stack_name: The name of the stack to wait for (templated)
     :param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
         stored
@@ -65,7 +64,7 @@ class CloudFormationCreateStackSensor(BaseSensorOperator):
 
     @cached_property
     def hook(self) -> CloudFormationHook:
-        """Create and return an CloudFormationHook"""
+        """Create and return a CloudFormationHook"""
         return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
 
 
@@ -109,5 +108,5 @@ class CloudFormationDeleteStackSensor(BaseSensorOperator):
 
     @cached_property
     def hook(self) -> CloudFormationHook:
-        """Create and return an CloudFormationHook"""
+        """Create and return a CloudFormationHook"""
         return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
diff --git a/docs/apache-airflow-providers-amazon/operators/cloudformation.rst b/docs/apache-airflow-providers-amazon/operators/cloudformation.rst
index f83a2861ab..6b2730c348 100644
--- a/docs/apache-airflow-providers-amazon/operators/cloudformation.rst
+++ b/docs/apache-airflow-providers-amazon/operators/cloudformation.rst
@@ -42,7 +42,7 @@ Create an AWS CloudFormation stack
 To create a new AWS CloudFormation stack use
 :class:`~airflow.providers.amazon.aws.operators.cloud_formation.CloudFormationCreateStackOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_cloudformation.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_cloudformation_create_stack]
@@ -56,7 +56,7 @@ Delete an AWS CloudFormation stack
 To delete an AWS CloudFormation stack you can use
 :class:`~airflow.providers.amazon.aws.operators.cloud_formation.CloudFormationDeleteStackOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_cloudformation.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_cloudformation_delete_stack]
@@ -73,7 +73,7 @@ Wait on an AWS CloudFormation stack creation state
 To wait on the state of an AWS CloudFormation stack creation until it reaches a terminal state you can use
 :class:`~airflow.providers.amazon.aws.sensors.cloud_formation.CloudFormationCreateStackSensor`
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_cloudformation.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_cloudformation_create_stack]
@@ -87,7 +87,7 @@ Wait on an AWS CloudFormation stack deletion state
 To wait on the state of an AWS CloudFormation stack deletion until it reaches a terminal state you can use
 use :class:`~airflow.providers.amazon.aws.sensors.cloud_formation.CloudFormationDeleteStackSensor`
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_cloudformation.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_cloudformation_delete_stack]
diff --git a/airflow/providers/amazon/aws/example_dags/example_cloudformation.py b/tests/system/providers/amazon/aws/example_cloudformation.py
similarity index 68%
rename from airflow/providers/amazon/aws/example_dags/example_cloudformation.py
rename to tests/system/providers/amazon/aws/example_cloudformation.py
index c31d3215ff..5a17e79246 100644
--- a/airflow/providers/amazon/aws/example_dags/example_cloudformation.py
+++ b/tests/system/providers/amazon/aws/example_cloudformation.py
@@ -14,8 +14,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import json
 from datetime import datetime
-from json import dumps
 
 from airflow import DAG
 from airflow.models.baseoperator import chain
@@ -27,10 +27,14 @@ from airflow.providers.amazon.aws.sensors.cloud_formation import (
     CloudFormationCreateStackSensor,
     CloudFormationDeleteStackSensor,
 )
+from tests.system.providers.amazon.aws.utils import set_env_id
 
-CLOUDFORMATION_STACK_NAME = 'example-stack-name'
-# The CloudFormation template must have at least one resource to be usable, this example uses SQS
-# as a free and serverless option.
+ENV_ID = set_env_id()
+DAG_ID = 'example_cloudformation'
+
+CLOUDFORMATION_STACK_NAME = f'{ENV_ID}-stack'
+# The CloudFormation template must have at least one resource to
+# be usable, this example uses SQS as a free and serverless option.
 CLOUDFORMATION_TEMPLATE = {
     'Description': 'Stack from Airflow CloudFormation example DAG',
     'Resources': {
@@ -41,14 +45,14 @@ CLOUDFORMATION_TEMPLATE = {
 }
 CLOUDFORMATION_CREATE_PARAMETERS = {
     'StackName': CLOUDFORMATION_STACK_NAME,
-    'TemplateBody': dumps(CLOUDFORMATION_TEMPLATE),
+    'TemplateBody': json.dumps(CLOUDFORMATION_TEMPLATE),
     'TimeoutInMinutes': 2,
     'OnFailure': 'DELETE',  # Don't leave stacks behind if creation fails.
 }
 
 with DAG(
-    dag_id='example_cloudformation',
-    schedule_interval=None,
+    dag_id=DAG_ID,
+    schedule_interval='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
@@ -64,20 +68,42 @@ with DAG(
 
     # [START howto_sensor_cloudformation_create_stack]
     wait_for_stack_create = CloudFormationCreateStackSensor(
-        task_id='wait_for_stack_creation', stack_name=CLOUDFORMATION_STACK_NAME
+        task_id='wait_for_stack_create',
+        stack_name=CLOUDFORMATION_STACK_NAME,
     )
     # [END howto_sensor_cloudformation_create_stack]
 
     # [START howto_operator_cloudformation_delete_stack]
     delete_stack = CloudFormationDeleteStackOperator(
-        task_id='delete_stack', stack_name=CLOUDFORMATION_STACK_NAME
+        task_id='delete_stack',
+        trigger_rule='all_done',
+        stack_name=CLOUDFORMATION_STACK_NAME,
     )
     # [END howto_operator_cloudformation_delete_stack]
 
     # [START howto_sensor_cloudformation_delete_stack]
     wait_for_stack_delete = CloudFormationDeleteStackSensor(
-        task_id='wait_for_stack_deletion', trigger_rule='all_done', stack_name=CLOUDFORMATION_STACK_NAME
+        task_id='wait_for_stack_delete',
+        trigger_rule='all_done',
+        stack_name=CLOUDFORMATION_STACK_NAME,
     )
     # [END howto_sensor_cloudformation_delete_stack]
 
-    chain(create_stack, wait_for_stack_create, delete_stack, wait_for_stack_delete)
+    chain(
+        create_stack,
+        wait_for_stack_create,
+        delete_stack,
+        wait_for_stack_delete,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)