You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/18 23:33:55 UTC

[airflow] branch main updated: Convert `example_eks_with_nodegroup_in_one_step` sample DAG to system test (AIP-47) (#26410)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 13892883cf Convert `example_eks_with_nodegroup_in_one_step` sample DAG to system test (AIP-47) (#26410)
13892883cf is described below

commit 13892883cf368f19b1a0b06c03891e0b3ba09ecb
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Sun Sep 18 16:33:43 2022 -0700

    Convert `example_eks_with_nodegroup_in_one_step` sample DAG to system test (AIP-47) (#26410)
    
    * move file
    
    * Convert EKS with Nodegroups in One Step sample DAG to system test
---
 .../operators/eks.rst                              |   4 +-
 .../aws}/example_eks_with_nodegroup_in_one_step.py | 100 +++++++++++++--------
 2 files changed, 63 insertions(+), 41 deletions(-)

diff --git a/docs/apache-airflow-providers-amazon/operators/eks.rst b/docs/apache-airflow-providers-amazon/operators/eks.rst
index ee38f3d31f..dadfd32a73 100644
--- a/docs/apache-airflow-providers-amazon/operators/eks.rst
+++ b/docs/apache-airflow-providers-amazon/operators/eks.rst
@@ -65,7 +65,7 @@ Note: An AWS IAM role with the following permissions is required:
   ``AmazonEKSClusterPolicy`` IAM Policy must be attached
   ``AmazonEKSWorkerNodePolicy`` IAM Policy must be attached
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_eks_create_cluster_with_nodegroup]
@@ -108,7 +108,7 @@ Note: If the cluster has any attached resources, such as an Amazon EKS Nodegroup
   Fargate profile, the cluster can not be deleted.  Using the ``force`` parameter will
   attempt to delete any attached resources first.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_eks_force_delete_cluster]
diff --git a/airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
similarity index 53%
rename from airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
rename to tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
index ce6032c37c..6da3aee9d5 100644
--- a/airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
+++ b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
@@ -17,8 +17,8 @@
 from __future__ import annotations
 
 from datetime import datetime
-from os import environ
 
+from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
 from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates
 from airflow.providers.amazon.aws.operators.eks import (
@@ -27,58 +27,63 @@ from airflow.providers.amazon.aws.operators.eks import (
     EksPodOperator,
 )
 from airflow.providers.amazon.aws.sensors.eks import EksClusterStateSensor, EksNodegroupStateSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
 
-# Ignore missing args provided by default_args
-# type: ignore[call-arg]
+DAG_ID = 'example_eks_with_nodegroup_in_one_step'
 
-CLUSTER_NAME = environ.get('EKS_CLUSTER_NAME', 'eks-demo')
-NODEGROUP_NAME = f'{CLUSTER_NAME}-nodegroup'
-ROLE_ARN = environ.get('EKS_DEMO_ROLE_ARN', 'arn:aws:iam::123456789012:role/role_name')
-SUBNETS = environ.get('EKS_DEMO_SUBNETS', 'subnet-12345ab subnet-67890cd').split(' ')
-VPC_CONFIG = {
-    'subnetIds': SUBNETS,
-    'endpointPublicAccess': True,
-    'endpointPrivateAccess': False,
-}
+# Externally fetched variables:
+ROLE_ARN_KEY = 'ROLE_ARN'
+SUBNETS_KEY = 'SUBNETS'
+
+sys_test_context_task = (
+    SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).add_variable(SUBNETS_KEY, split_string=True).build()
+)
 
 
 with DAG(
-    dag_id='example_eks_with_nodegroup_in_one_step',
+    dag_id=DAG_ID,
+    schedule='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context[ENV_ID_KEY]
+
+    cluster_name = f'{env_id}-cluster'
+    nodegroup_name = f'{env_id}-nodegroup'
 
     # [START howto_operator_eks_create_cluster_with_nodegroup]
     # Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
     create_cluster_and_nodegroup = EksCreateClusterOperator(
-        task_id='create_eks_cluster_and_nodegroup',
-        cluster_name=CLUSTER_NAME,
-        nodegroup_name=NODEGROUP_NAME,
-        cluster_role_arn=ROLE_ARN,
-        nodegroup_role_arn=ROLE_ARN,
+        task_id='create_cluster_and_nodegroup',
+        cluster_name=cluster_name,
+        nodegroup_name=nodegroup_name,
+        cluster_role_arn=test_context[ROLE_ARN_KEY],
         # Opting to use the same ARN for the cluster and the nodegroup here,
         # but a different ARN could be configured and passed if desired.
-        resources_vpc_config=VPC_CONFIG,
-        # Compute defaults to 'nodegroup' but is called out here for the purposed of the example.
+        nodegroup_role_arn=test_context[ROLE_ARN_KEY],
+        resources_vpc_config={'subnetIds': test_context[SUBNETS_KEY]},
+        # ``compute='nodegroup'`` is the default, explicitly set here for demo purposes.
         compute='nodegroup',
     )
     # [END howto_operator_eks_create_cluster_with_nodegroup]
 
     await_create_nodegroup = EksNodegroupStateSensor(
-        task_id='wait_for_create_nodegroup',
-        cluster_name=CLUSTER_NAME,
-        nodegroup_name=NODEGROUP_NAME,
+        task_id='await_create_nodegroup',
+        cluster_name=cluster_name,
+        nodegroup_name=nodegroup_name,
         target_state=NodegroupStates.ACTIVE,
     )
 
     start_pod = EksPodOperator(
-        task_id="run_pod",
-        cluster_name=CLUSTER_NAME,
-        pod_name="run_pod",
-        image="amazon/aws-cli:latest",
-        cmds=["sh", "-c", "echo Test Airflow; date"],
-        labels={"demo": "hello_world"},
+        task_id='start_pod',
+        pod_name='test_pod',
+        cluster_name=cluster_name,
+        image='amazon/aws-cli:latest',
+        cmds=['sh', '-c', 'echo Test Airflow; date'],
+        labels={'demo': 'hello_world'},
         get_logs=True,
         # Delete the pod when it reaches its final state, or the execution is interrupted.
         is_delete_operator_pod=True,
@@ -87,23 +92,40 @@ with DAG(
     # [START howto_operator_eks_force_delete_cluster]
     # An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
     # Setting the `force` to `True` will delete any attached resources before deleting the cluster.
-    delete_all = EksDeleteClusterOperator(
+    delete_nodegroup_and_cluster = EksDeleteClusterOperator(
         task_id='delete_nodegroup_and_cluster',
-        cluster_name=CLUSTER_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+        cluster_name=cluster_name,
         force_delete_compute=True,
     )
     # [END howto_operator_eks_force_delete_cluster]
 
     await_delete_cluster = EksClusterStateSensor(
-        task_id='wait_for_delete_cluster',
-        cluster_name=CLUSTER_NAME,
+        task_id='await_delete_cluster',
+        trigger_rule=TriggerRule.ALL_DONE,
+        cluster_name=cluster_name,
         target_state=ClusterStates.NONEXISTENT,
     )
 
-    (
-        create_cluster_and_nodegroup
-        >> await_create_nodegroup
-        >> start_pod
-        >> delete_all
-        >> await_delete_cluster
+    chain(
+        # TEST SETUP
+        test_context,
+        # TEST BODY
+        create_cluster_and_nodegroup,
+        await_create_nodegroup,
+        start_pod,
+        delete_nodegroup_and_cluster,
+        await_delete_cluster,
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)