You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/18 23:33:55 UTC
[airflow] branch main updated: Convert `example_eks_with_nodegroup_in_one_step` sample DAG to system test (AIP-47) (#26410)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 13892883cf Convert `example_eks_with_nodegroup_in_one_step` sample DAG to system test (AIP-47) (#26410)
13892883cf is described below
commit 13892883cf368f19b1a0b06c03891e0b3ba09ecb
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Sun Sep 18 16:33:43 2022 -0700
Convert `example_eks_with_nodegroup_in_one_step` sample DAG to system test (AIP-47) (#26410)
* move file
* Convert EKS with Nodegroups in One Step sample DAG to system test
---
.../operators/eks.rst | 4 +-
.../aws}/example_eks_with_nodegroup_in_one_step.py | 100 +++++++++++++--------
2 files changed, 63 insertions(+), 41 deletions(-)
diff --git a/docs/apache-airflow-providers-amazon/operators/eks.rst b/docs/apache-airflow-providers-amazon/operators/eks.rst
index ee38f3d31f..dadfd32a73 100644
--- a/docs/apache-airflow-providers-amazon/operators/eks.rst
+++ b/docs/apache-airflow-providers-amazon/operators/eks.rst
@@ -65,7 +65,7 @@ Note: An AWS IAM role with the following permissions is required:
``AmazonEKSClusterPolicy`` IAM Policy must be attached
``AmazonEKSWorkerNodePolicy`` IAM Policy must be attached
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eks_create_cluster_with_nodegroup]
@@ -108,7 +108,7 @@ Note: If the cluster has any attached resources, such as an Amazon EKS Nodegroup
Fargate profile, the cluster can not be deleted. Using the ``force`` parameter will
attempt to delete any attached resources first.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eks_force_delete_cluster]
diff --git a/airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
similarity index 53%
rename from airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
rename to tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
index ce6032c37c..6da3aee9d5 100644
--- a/airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
+++ b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
@@ -17,8 +17,8 @@
from __future__ import annotations
from datetime import datetime
-from os import environ
+from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates
from airflow.providers.amazon.aws.operators.eks import (
@@ -27,58 +27,63 @@ from airflow.providers.amazon.aws.operators.eks import (
EksPodOperator,
)
from airflow.providers.amazon.aws.sensors.eks import EksClusterStateSensor, EksNodegroupStateSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
-# Ignore missing args provided by default_args
-# type: ignore[call-arg]
+DAG_ID = 'example_eks_with_nodegroup_in_one_step'
-CLUSTER_NAME = environ.get('EKS_CLUSTER_NAME', 'eks-demo')
-NODEGROUP_NAME = f'{CLUSTER_NAME}-nodegroup'
-ROLE_ARN = environ.get('EKS_DEMO_ROLE_ARN', 'arn:aws:iam::123456789012:role/role_name')
-SUBNETS = environ.get('EKS_DEMO_SUBNETS', 'subnet-12345ab subnet-67890cd').split(' ')
-VPC_CONFIG = {
- 'subnetIds': SUBNETS,
- 'endpointPublicAccess': True,
- 'endpointPrivateAccess': False,
-}
+# Externally fetched variables:
+ROLE_ARN_KEY = 'ROLE_ARN'
+SUBNETS_KEY = 'SUBNETS'
+
+sys_test_context_task = (
+ SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).add_variable(SUBNETS_KEY, split_string=True).build()
+)
with DAG(
- dag_id='example_eks_with_nodegroup_in_one_step',
+ dag_id=DAG_ID,
+ schedule='@once',
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context[ENV_ID_KEY]
+
+ cluster_name = f'{env_id}-cluster'
+ nodegroup_name = f'{env_id}-nodegroup'
# [START howto_operator_eks_create_cluster_with_nodegroup]
# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EksCreateClusterOperator(
- task_id='create_eks_cluster_and_nodegroup',
- cluster_name=CLUSTER_NAME,
- nodegroup_name=NODEGROUP_NAME,
- cluster_role_arn=ROLE_ARN,
- nodegroup_role_arn=ROLE_ARN,
+ task_id='create_cluster_and_nodegroup',
+ cluster_name=cluster_name,
+ nodegroup_name=nodegroup_name,
+ cluster_role_arn=test_context[ROLE_ARN_KEY],
# Opting to use the same ARN for the cluster and the nodegroup here,
# but a different ARN could be configured and passed if desired.
- resources_vpc_config=VPC_CONFIG,
- # Compute defaults to 'nodegroup' but is called out here for the purposed of the example.
+ nodegroup_role_arn=test_context[ROLE_ARN_KEY],
+ resources_vpc_config={'subnetIds': test_context[SUBNETS_KEY]},
+ # ``compute='nodegroup'`` is the default, explicitly set here for demo purposes.
compute='nodegroup',
)
# [END howto_operator_eks_create_cluster_with_nodegroup]
await_create_nodegroup = EksNodegroupStateSensor(
- task_id='wait_for_create_nodegroup',
- cluster_name=CLUSTER_NAME,
- nodegroup_name=NODEGROUP_NAME,
+ task_id='await_create_nodegroup',
+ cluster_name=cluster_name,
+ nodegroup_name=nodegroup_name,
target_state=NodegroupStates.ACTIVE,
)
start_pod = EksPodOperator(
- task_id="run_pod",
- cluster_name=CLUSTER_NAME,
- pod_name="run_pod",
- image="amazon/aws-cli:latest",
- cmds=["sh", "-c", "echo Test Airflow; date"],
- labels={"demo": "hello_world"},
+ task_id='start_pod',
+ pod_name='test_pod',
+ cluster_name=cluster_name,
+ image='amazon/aws-cli:latest',
+ cmds=['sh', '-c', 'echo Test Airflow; date'],
+ labels={'demo': 'hello_world'},
get_logs=True,
# Delete the pod when it reaches its final state, or the execution is interrupted.
is_delete_operator_pod=True,
@@ -87,23 +92,40 @@ with DAG(
# [START howto_operator_eks_force_delete_cluster]
# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
- delete_all = EksDeleteClusterOperator(
+ delete_nodegroup_and_cluster = EksDeleteClusterOperator(
task_id='delete_nodegroup_and_cluster',
- cluster_name=CLUSTER_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ cluster_name=cluster_name,
force_delete_compute=True,
)
# [END howto_operator_eks_force_delete_cluster]
await_delete_cluster = EksClusterStateSensor(
- task_id='wait_for_delete_cluster',
- cluster_name=CLUSTER_NAME,
+ task_id='await_delete_cluster',
+ trigger_rule=TriggerRule.ALL_DONE,
+ cluster_name=cluster_name,
target_state=ClusterStates.NONEXISTENT,
)
- (
- create_cluster_and_nodegroup
- >> await_create_nodegroup
- >> start_pod
- >> delete_all
- >> await_delete_cluster
+ chain(
+ # TEST SETUP
+ test_context,
+ # TEST BODY
+ create_cluster_and_nodegroup,
+ await_create_nodegroup,
+ start_pod,
+ delete_nodegroup_and_cluster,
+ await_delete_cluster,
)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)