You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/21 10:04:01 UTC

[airflow] branch main updated: Convert EC2 sample DAG to system test (#26540)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 156fbd0c67 Convert EC2 sample DAG to system test (#26540)
156fbd0c67 is described below

commit 156fbd0c67a97379107aab2bae69432a40827cc4
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Wed Sep 21 03:03:53 2022 -0700

    Convert EC2 sample DAG to system test (#26540)
---
 .../amazon/aws/example_dags/example_ec2.py         |  57 --------
 .../operators/ec2.rst                              |   6 +-
 tests/system/providers/amazon/aws/example_ec2.py   | 145 +++++++++++++++++++++
 3 files changed, 148 insertions(+), 60 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_ec2.py b/airflow/providers/amazon/aws/example_dags/example_ec2.py
deleted file mode 100644
index 7f523a1283..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_ec2.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow import DAG
-from airflow.models.baseoperator import chain
-from airflow.providers.amazon.aws.operators.ec2 import EC2StartInstanceOperator, EC2StopInstanceOperator
-from airflow.providers.amazon.aws.sensors.ec2 import EC2InstanceStateSensor
-
-INSTANCE_ID = os.getenv("INSTANCE_ID", "instance-id")
-
-with DAG(
-    dag_id='example_ec2',
-    start_date=datetime(2021, 1, 1),
-    tags=['example'],
-    catchup=False,
-) as dag:
-    # [START howto_operator_ec2_start_instance]
-    start_instance = EC2StartInstanceOperator(
-        task_id="ec2_start_instance",
-        instance_id=INSTANCE_ID,
-    )
-    # [END howto_operator_ec2_start_instance]
-
-    # [START howto_sensor_ec2_instance_state]
-    instance_state = EC2InstanceStateSensor(
-        task_id="ec2_instance_state",
-        instance_id=INSTANCE_ID,
-        target_state="running",
-    )
-    # [END howto_sensor_ec2_instance_state]
-
-    # [START howto_operator_ec2_stop_instance]
-    stop_instance = EC2StopInstanceOperator(
-        task_id="ec2_stop_instance",
-        instance_id=INSTANCE_ID,
-    )
-    # [END howto_operator_ec2_stop_instance]
-
-    chain(start_instance, instance_state, stop_instance)
diff --git a/docs/apache-airflow-providers-amazon/operators/ec2.rst b/docs/apache-airflow-providers-amazon/operators/ec2.rst
index 86807341a0..37e5d804ee 100644
--- a/docs/apache-airflow-providers-amazon/operators/ec2.rst
+++ b/docs/apache-airflow-providers-amazon/operators/ec2.rst
@@ -38,7 +38,7 @@ Start an Amazon EC2 instance
 To start an Amazon EC2 instance you can use
 :class:`~airflow.providers.amazon.aws.operators.ec2.EC2StartInstanceOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ec2.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ec2.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_ec2_start_instance]
@@ -52,7 +52,7 @@ Stop an Amazon EC2 instance
 To stop an Amazon EC2 instance you can use
 :class:`~airflow.providers.amazon.aws.operators.ec2.EC2StopInstanceOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ec2.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ec2.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_ec2_stop_instance]
@@ -69,7 +69,7 @@ Wait on an Amazon EC2 instance state
 To check the state of an Amazon EC2 instance and wait until it reaches the target state you can use
 :class:`~airflow.providers.amazon.aws.sensors.ec2.EC2InstanceStateSensor`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ec2.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ec2.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_ec2_instance_state]
diff --git a/tests/system/providers/amazon/aws/example_ec2.py b/tests/system/providers/amazon/aws/example_ec2.py
new file mode 100644
index 0000000000..5b326e9d19
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_ec2.py
@@ -0,0 +1,145 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+from operator import itemgetter
+
+import boto3
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.ec2 import EC2StartInstanceOperator, EC2StopInstanceOperator
+from airflow.providers.amazon.aws.sensors.ec2 import EC2InstanceStateSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+DAG_ID = 'example_ec2'
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+
+def _get_latest_ami_id():
+    """Returns the AMI ID of the most recently-created Amazon Linux image"""
+
+    # Amazon is retiring AL2 in 2023 and replacing it with Amazon Linux 2022.
+    # This image prefix should be futureproof, but may need adjusting depending
+    # on how they name the new images.  This page should have AL2022 info when
+    # it comes available: https://aws.amazon.com/linux/amazon-linux-2022/faqs/
+    image_prefix = 'Amazon Linux*'
+
+    images = boto3.client('ec2').describe_images(
+        Filters=[{'Name': 'description', 'Values': [image_prefix]}], Owners=['amazon']
+    )
+    # Sort on CreationDate
+    sorted_images = sorted(images['Images'], key=itemgetter('CreationDate'), reverse=True)
+    return sorted_images[0]['ImageId']
+
+
+@task
+def create_key_pair(key_name: str):
+    client = boto3.client('ec2')
+
+    key_pair_id = client.create_key_pair(KeyName=key_name)['KeyName']
+    # Creating the key takes a very short but measurable time, preventing race condition:
+    client.get_waiter('key_pair_exists').wait(KeyNames=[key_pair_id])
+
+    return key_pair_id
+
+
+@task
+def create_instance(instance_name: str, key_pair_id: str):
+    return boto3.client('ec2').run_instances(
+        ImageId=_get_latest_ami_id(),
+        MinCount=1,
+        MaxCount=1,
+        InstanceType='t2.micro',
+        KeyName=key_pair_id,
+        TagSpecifications=[{'ResourceType': 'instance', 'Tags': [{'Key': 'Name', 'Value': instance_name}]}],
+    )['Instances'][0]['InstanceId']
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def terminate_instance(instance: str):
+    boto3.client('ec2').terminate_instances(InstanceIds=[instance])
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_key_pair(key_pair_id: str):
+    boto3.client('ec2').delete_key_pair(KeyName=key_pair_id)
+
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule='@once',
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context[ENV_ID_KEY]
+
+    key_name = create_key_pair(key_name=f'{env_id}_key_pair')
+    instance_id = create_instance(instance_name=f'{env_id}-instance', key_pair_id=key_name)
+    # [START howto_operator_ec2_start_instance]
+    start_instance = EC2StartInstanceOperator(
+        task_id="start_instance",
+        instance_id=instance_id,
+    )
+    # [END howto_operator_ec2_start_instance]
+
+    # [START howto_sensor_ec2_instance_state]
+    await_instance = EC2InstanceStateSensor(
+        task_id="await_instance",
+        instance_id=instance_id,
+        target_state="running",
+    )
+    # [END howto_sensor_ec2_instance_state]
+
+    # [START howto_operator_ec2_stop_instance]
+    stop_instance = EC2StopInstanceOperator(
+        task_id="stop_instance",
+        instance_id=instance_id,
+    )
+    # [END howto_operator_ec2_stop_instance]
+    stop_instance.trigger_rule = TriggerRule.ALL_DONE
+
+    chain(
+        # TEST SETUP
+        test_context,
+        key_name,
+        instance_id,
+        # TEST BODY
+        start_instance,
+        await_instance,
+        stop_instance,
+        # TEST TEARDOWN
+        terminate_instance(instance_id),
+        delete_key_pair(key_name),
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)