You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/10 04:27:16 UTC

[airflow] branch main updated: ECS System Test (#26808)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f17abcc931 ECS System Test (#26808)
f17abcc931 is described below

commit f17abcc931382b8f3d3b777359b3d92f019fda38
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Sun Oct 9 23:27:07 2022 -0500

    ECS System Test (#26808)
---
 .../operators/ecs.rst                              |  20 ++--
 .../system/providers/amazon/aws}/example_ecs.py    | 133 +++++++++++++--------
 2 files changed, 94 insertions(+), 59 deletions(-)

diff --git a/docs/apache-airflow-providers-amazon/operators/ecs.rst b/docs/apache-airflow-providers-amazon/operators/ecs.rst
index 8ea7bc6247..2ba584cb77 100644
--- a/docs/apache-airflow-providers-amazon/operators/ecs.rst
+++ b/docs/apache-airflow-providers-amazon/operators/ecs.rst
@@ -44,7 +44,7 @@ To create an Amazon ECS cluster you can use
 All optional parameters to be passed to the Create Cluster API should be
 passed in the 'create_cluster_kwargs' dict.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_ecs_create_cluster]
@@ -59,7 +59,7 @@ To delete an Amazon ECS cluster you can use
 :class:`~airflow.providers.amazon.aws.operators.ecs.EcsDeleteClusterOperator`.
 
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_ecs_delete_cluster]
@@ -77,7 +77,7 @@ All optional parameters to be passed to the Register Task Definition API should
 passed in the 'register_task_kwargs' dict.
 
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_ecs_register_task_definition]
@@ -92,7 +92,7 @@ To deregister a task definition you can use
 :class:`~airflow.providers.amazon.aws.operators.ecs.EcsDeregisterTaskDefinitionOperator`.
 
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_ecs_deregister_task_definition]
@@ -121,7 +121,7 @@ The parameters you need to configure for this Operator will depend upon which ``
 * If you are using EC2 as the compute resources in your ECS Cluster, set the parameter to EC2.
 * If you have integrated external resources in your ECS Cluster, for example using ECS Anywhere, and want to run your containers on those external resources, set the parameter to EXTERNAL.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_ecs_run_task]
@@ -141,9 +141,9 @@ To stream logs to AWS CloudWatch, you need to define the parameters below.
 Using the example above, we would add these additional parameters to enable logging to CloudWatch.
 You need to ensure that you have the appropriate level of permissions (see next section).
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs.py
     :language: python
-    :dedent: 4
+    :dedent: 8
     :start-after: [START howto_awslogs_ecs]
     :end-before: [END howto_awslogs_ecs]
 
@@ -211,7 +211,7 @@ the failure reason if a failed state is provided and that state is reached
 before the target state.
 
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_ecs_cluster_state]
@@ -231,7 +231,7 @@ to change that.  Raises an AirflowException with the failure reason if the faile
 is reached before the target state.
 
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_ecs_task_definition_state]
@@ -250,7 +250,7 @@ both can be overridden with provided values.  Raises an AirflowException with
 the failure reason if a failed state is provided and that state is reached
 before the target state.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_ecs_task_state]
diff --git a/airflow/providers/amazon/aws/example_dags/example_ecs.py b/tests/system/providers/amazon/aws/example_ecs.py
similarity index 57%
rename from airflow/providers/amazon/aws/example_dags/example_ecs.py
rename to tests/system/providers/amazon/aws/example_ecs.py
index 4439ef0ee3..6c7ccf25e1 100644
--- a/airflow/providers/amazon/aws/example_dags/example_ecs.py
+++ b/tests/system/providers/amazon/aws/example_ecs.py
@@ -18,7 +18,10 @@ from __future__ import annotations
 
 from datetime import datetime
 
+import boto3
+
 from airflow import DAG
+from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsTaskStates
 from airflow.providers.amazon.aws.operators.ecs import (
@@ -34,48 +37,69 @@ from airflow.providers.amazon.aws.sensors.ecs import (
     EcsTaskStateSensor,
 )
 from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+DAG_ID = 'example_ecs'
+
+# Externally fetched variables:
+EXISTING_CLUSTER_NAME_KEY = 'CLUSTER_NAME'
+EXISTING_CLUSTER_SUBNETS_KEY = 'SUBNETS'
+
+sys_test_context_task = (
+    SystemTestContextBuilder()
+    # NOTE:  Creating a functional ECS Cluster which uses EC2 requires manually creating
+    # and configuring a number of resources such as autoscaling groups, networking
+    # etc. which is out of scope for this demo and time-consuming for a system test
+    # To simplify this demo and make it run in a reasonable length of time as a
+    # system test, follow the steps below to create a new cluster on the AWS Console
+    # which handles all asset creation and configuration using default values:
+    # 1. https://us-east-1.console.aws.amazon.com/ecs/home?region=us-east-1#/clusters
+    # 2. Select "EC2 Linux + Networking" and hit "Next"
+    # 3. Name your cluster in the first field and click Create
+    .add_variable(EXISTING_CLUSTER_NAME_KEY)
+    .add_variable(EXISTING_CLUSTER_SUBNETS_KEY, split_string=True)
+    .build()
+)
 
-DAG_ID = 'new_ecs_refactor'
-ENV_ID = 'env1234why-2'
 
-# NOTE:  Creating a functional ECS Cluster which uses EC2 requires manually creating
-# and configuring a number of resources such as autoscaling groups, networking
-# etc which is out of scope for this demo and time-consuming for a system test
-# To simplify this demo and make it run in a reasonable length of time as a
-# system test, follow the steps below to create a new cluster on the AWS Console
-# which handles all asset creation and configuration using default values:
-# 1. https://us-east-1.console.aws.amazon.com/ecs/home?region=us-east-1#/clusters
-# 2. Select "EC2 Linux + Networking" and hit "Next"
-# 3. Name your cluster in the first field and click Create
-# 4. Enter the name you provided and the subnets that were generated below:
-EXISTING_CLUSTER_NAME = 'using-defaults'
-SUBNETS = ['subnet-08c6deb88019ef902']
+@task
+def get_region():
+    return boto3.session.Session().region_name
 
 
 with DAG(
     dag_id=DAG_ID,
+    schedule='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
-    env_id = ENV_ID
-    cluster_name = f'{env_id}-cluster'
+    test_context = sys_test_context_task()
+    env_id = test_context[ENV_ID_KEY]
+    existing_cluster_name = test_context[EXISTING_CLUSTER_NAME_KEY]
+    existing_cluster_subnets = test_context[EXISTING_CLUSTER_SUBNETS_KEY]
+
+    new_cluster_name = f'{env_id}-cluster'
     container_name = f'{env_id}-container'
     family_name = f'{env_id}-task-definition'
     asg_name = f'{env_id}-asg'
 
+    aws_region = get_region()
+
     # [START howto_operator_ecs_create_cluster]
     create_cluster = EcsCreateClusterOperator(
         task_id='create_cluster',
-        cluster_name=cluster_name,
-        wait_for_completion=False,
+        cluster_name=new_cluster_name,
     )
     # [END howto_operator_ecs_create_cluster]
 
+    # EcsCreateClusterOperator waits by default, setting as False to test the Sensor below.
+    create_cluster.wait_for_completion = False
+
     # [START howto_sensor_ecs_cluster_state]
     await_cluster = EcsClusterStateSensor(
         task_id='await_cluster',
-        cluster_name=cluster_name,
+        cluster_name=new_cluster_name,
     )
     # [END howto_sensor_ecs_cluster_state]
 
@@ -100,47 +124,40 @@ with DAG(
     )
     # [END howto_operator_ecs_register_task_definition]
 
-    registered_task_definition = register_task.output
-
     # [START howto_sensor_ecs_task_definition_state]
     await_task_definition = EcsTaskDefinitionStateSensor(
         task_id='await_task_definition',
-        task_definition=registered_task_definition,
+        task_definition=register_task.output,
     )
     # [END howto_sensor_ecs_task_definition_state]
 
     # [START howto_operator_ecs_run_task]
     run_task = EcsRunTaskOperator(
-        task_id="run_task",
-        cluster=EXISTING_CLUSTER_NAME,
-        task_definition=registered_task_definition,
-        launch_type="EC2",
+        task_id='run_task',
+        cluster=existing_cluster_name,
+        task_definition=register_task.output,
         overrides={
-            "containerOverrides": [
+            'containerOverrides': [
                 {
-                    "name": container_name,
-                    "command": ["echo", "hello", "world"],
+                    'name': container_name,
+                    'command': ['echo', 'hello', 'world'],
                 },
             ],
         },
-        network_configuration={'awsvpcConfiguration': {'subnets': SUBNETS}},
-        tags={
-            "Customer": "X",
-            "Project": "Y",
-            "Application": "Z",
-            "Version": "0.0.1",
-            "Environment": "Development",
-        },
+        network_configuration={'awsvpcConfiguration': {'subnets': existing_cluster_subnets}},
         # [START howto_awslogs_ecs]
-        awslogs_group="/ecs/hello-world",
-        awslogs_region='us-east-1',
-        awslogs_stream_prefix="ecs/hello-world-container",
+        awslogs_group='/ecs/hello-world',
+        awslogs_region=aws_region,
+        awslogs_stream_prefix='ecs/hello-world-container',
         # [END howto_awslogs_ecs]
-        # NOTE: You must set `reattach=True` in order to get ecs_task_arn if you plan to use a Sensor.
+        # You must set `reattach=True` in order to get ecs_task_arn if you plan to use a Sensor.
         reattach=True,
     )
     # [END howto_operator_ecs_run_task]
 
+    # EcsRunTaskOperator waits by default, setting as False to test the Sensor below.
+    run_task.wait_for_completion = False
+
     # [START howto_sensor_ecs_task_state]
     # By default, EcsTaskStateSensor waits until the task has started, but the
     # demo task runs so fast that the sensor misses it.  This sensor instead
@@ -148,8 +165,8 @@ with DAG(
     # the target_state and failure_states parameters.
     await_task_finish = EcsTaskStateSensor(
         task_id='await_task_finish',
-        cluster=EXISTING_CLUSTER_NAME,
-        task='{{ ti.xcom_pull(key="ecs_task_arn") }}',
+        cluster=existing_cluster_name,
+        task=run_task.output['ecs_task_arn'],
         target_state=EcsTaskStates.STOPPED,
         failure_states={EcsTaskStates.NONE},
     )
@@ -158,29 +175,35 @@ with DAG(
     # [START howto_operator_ecs_deregister_task_definition]
     deregister_task = EcsDeregisterTaskDefinitionOperator(
         task_id='deregister_task',
-        trigger_rule=TriggerRule.ALL_DONE,
-        task_definition=registered_task_definition,
+        task_definition=register_task.output,
     )
     # [END howto_operator_ecs_deregister_task_definition]
+    deregister_task.trigger_rule = TriggerRule.ALL_DONE
 
     # [START howto_operator_ecs_delete_cluster]
     delete_cluster = EcsDeleteClusterOperator(
         task_id='delete_cluster',
-        trigger_rule=TriggerRule.ALL_DONE,
-        cluster_name=cluster_name,
-        wait_for_completion=False,
+        cluster_name=new_cluster_name,
     )
     # [END howto_operator_ecs_delete_cluster]
+    delete_cluster.trigger_rule = TriggerRule.ALL_DONE
+
+    # EcsDeleteClusterOperator waits by default, setting as False to test the Sensor below.
+    delete_cluster.wait_for_completion = False
 
     # [START howto_operator_ecs_delete_cluster]
     await_delete_cluster = EcsClusterStateSensor(
         task_id='await_delete_cluster',
-        cluster_name=cluster_name,
+        cluster_name=new_cluster_name,
         target_state=EcsClusterStates.INACTIVE,
     )
     # [END howto_operator_ecs_delete_cluster]
 
     chain(
+        # TEST SETUP
+        test_context,
+        aws_region,
+        # TEST BODY
         create_cluster,
         await_cluster,
         register_task,
@@ -191,3 +214,15 @@ with DAG(
         delete_cluster,
         await_delete_cluster,
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)