You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/10 06:09:30 UTC
[airflow] branch main updated: Convert emr_eks example dag to system test (#26723)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 62d5bab3b4 Convert emr_eks example dag to system test (#26723)
62d5bab3b4 is described below
commit 62d5bab3b4cdb423c668b0f2e29d5c52b8ca0ca4
Author: Vincent <97...@users.noreply.github.com>
AuthorDate: Mon Oct 10 02:09:22 2022 -0400
Convert emr_eks example dag to system test (#26723)
---
.../amazon/aws/example_dags/example_emr_eks.py | 91 -------
.../operators/emr_eks.rst | 9 +-
docs/spelling_wordlist.txt | 1 +
.../system/providers/amazon/aws/example_emr_eks.py | 282 +++++++++++++++++++++
4 files changed, 288 insertions(+), 95 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_emr_eks.py b/airflow/providers/amazon/aws/example_dags/example_emr_eks.py
deleted file mode 100644
index d827938985..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow import DAG
-from airflow.models.baseoperator import chain
-from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator, EmrEksCreateClusterOperator
-from airflow.providers.amazon.aws.sensors.emr import EmrContainerSensor
-
-JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role")
-
-# [START howto_operator_emr_eks_config]
-JOB_DRIVER_ARG = {
- "sparkSubmitJobDriver": {
- "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
- "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1", # noqa: E501
- }
-}
-
-CONFIGURATION_OVERRIDES_ARG = {
- "applicationConfiguration": [
- {
- "classification": "spark-defaults",
- "properties": {
- "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", # noqa: E501
- },
- }
- ],
- "monitoringConfiguration": {
- "cloudWatchMonitoringConfiguration": {
- "logGroupName": "/aws/emr-eks-spark",
- "logStreamNamePrefix": "airflow",
- }
- },
-}
-# [END howto_operator_emr_eks_config]
-
-with DAG(
- dag_id='example_emr_eks',
- start_date=datetime(2021, 1, 1),
- tags=['example'],
- catchup=False,
-) as dag:
- # [START howto_operator_emr_eks_create_cluster]
- create_emr_eks_cluster = EmrEksCreateClusterOperator(
- task_id="create_emr_eks_cluster",
- virtual_cluster_name="emr_eks_virtual_cluster",
- eks_cluster_name="eks_cluster",
- eks_namespace="eks_namespace",
- )
- # [END howto_operator_emr_eks_create_cluster]
-
- # [START howto_operator_emr_container]
- job_starter = EmrContainerOperator(
- task_id="start_job",
- virtual_cluster_id=str(create_emr_eks_cluster.output),
- execution_role_arn=JOB_ROLE_ARN,
- release_label="emr-6.3.0-latest",
- job_driver=JOB_DRIVER_ARG,
- configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
- name="pi.py",
- wait_for_completion=False,
- )
- # [END howto_operator_emr_container]
-
- # [START howto_sensor_emr_container]
- job_waiter = EmrContainerSensor(
- task_id="job_waiter",
- virtual_cluster_id=str(create_emr_eks_cluster.output),
- job_id=str(job_starter.output),
- )
- # [END howto_sensor_emr_container]
-
- chain(job_starter, job_waiter)
diff --git a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
index 1116aa6be6..ee2ee50ec1 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
@@ -48,7 +48,7 @@ the eks cluster that you would like to use , and an eks namespace.
Refer to the `EMR on EKS Development guide <https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/virtual-cluster.html>`__
for more details.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_eks.py
:language: python
:start-after: [START howto_operator_emr_eks_create_cluster]
:end-before: [END howto_operator_emr_eks_create_cluster]
@@ -81,8 +81,9 @@ and ``monitoringConfiguration`` to send logs to the ``/aws/emr-eks-spark`` log g
Refer to the `EMR on EKS guide <https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/emr-eks-jobs-CLI.html#emr-eks-jobs-parameters>`__
for more details on job configuration.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_eks.py
:language: python
+ :dedent: 4
:start-after: [START howto_operator_emr_eks_config]
:end-before: [END howto_operator_emr_eks_config]
@@ -91,7 +92,7 @@ can store them in a connection or provide them in the DAG. Your AWS region shoul
in the ``aws_default`` connection as ``{"region_name": "us-east-1"}`` or a custom connection name
that gets passed to the operator with the ``aws_conn_id`` parameter. The operator returns the Job ID of the job run.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_eks.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_container]
@@ -109,7 +110,7 @@ Wait on an Amazon EMR virtual cluster job
To wait on the status of an Amazon EMR virtual cluster job to reach a terminal state, you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_eks.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_emr_container]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 06ec96fd35..8d8ba0149f 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -956,6 +956,7 @@ nosasl
NotFound
notificationChannels
npm
+ns
ntlm
ntpd
Nullable
diff --git a/tests/system/providers/amazon/aws/example_emr_eks.py b/tests/system/providers/amazon/aws/example_emr_eks.py
new file mode 100644
index 0000000000..b10dce7c2c
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_emr_eks.py
@@ -0,0 +1,282 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import subprocess
+from datetime import datetime
+
+import boto3
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates
+from airflow.providers.amazon.aws.operators.eks import EksCreateClusterOperator, EksDeleteClusterOperator
+from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator, EmrEksCreateClusterOperator
+from airflow.providers.amazon.aws.operators.s3 import (
+ S3CreateBucketOperator,
+ S3CreateObjectOperator,
+ S3DeleteBucketOperator,
+)
+from airflow.providers.amazon.aws.sensors.eks import EksClusterStateSensor, EksNodegroupStateSensor
+from airflow.providers.amazon.aws.sensors.emr import EmrContainerSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+DAG_ID = 'example_emr_eks'
+
+# Externally fetched variables
+ROLE_ARN_KEY = 'ROLE_ARN'
+JOB_ROLE_ARN_KEY = 'JOB_ROLE_ARN'
+SUBNETS_KEY = 'SUBNETS'
+
+sys_test_context_task = (
+ SystemTestContextBuilder()
+ .add_variable(ROLE_ARN_KEY)
+ .add_variable(JOB_ROLE_ARN_KEY)
+ .add_variable(SUBNETS_KEY, split_string=True)
+ .build()
+)
+
+S3_FILE_NAME = 'pi.py'
+S3_FILE_CONTENT = '''
+k = 1
+s = 0
+
+for i in range(1000000):
+ if i % 2 == 0:
+ s += 4/k
+ else:
+ s -= 4/k
+
+ k += 2
+
+print(s)
+'''
+
+
+@task
+def enable_access_emr_on_eks(cluster, ns):
+ # Install eksctl and enable access for EMR on EKS
+ # See https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-cluster-access.html
+ file = 'https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz'
+ commands = f"""
+ curl --silent --location "{file}" | tar xz -C /tmp &&
+ sudo mv /tmp/eksctl /usr/local/bin &&
+ eksctl create iamidentitymapping --cluster {cluster} --namespace {ns} --service-name "emr-containers"
+ """
+
+ build = subprocess.Popen(
+ commands,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ _, err = build.communicate()
+
+ if build.returncode != 0:
+ raise RuntimeError(err)
+
+
+@task
+def get_execution_role_name() -> str:
+ return boto3.client('sts').get_caller_identity()['Arn'].split('/')[-1]
+
+
+@task
+def update_trust_policy_execution_role(cluster_name, cluster_namespace, role_name):
+ # See https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-trust-policy.html
+ # The action "update-role-trust-policy" is not available in boto3, thus we need to do it using AWS CLI
+ commands = (
+ f'aws emr-containers update-role-trust-policy --cluster-name {cluster_name} '
+ f'--namespace {cluster_namespace} --role-name {role_name}'
+ )
+
+ build = subprocess.Popen(
+ commands,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ _, err = build.communicate()
+
+ if build.returncode != 0:
+ raise RuntimeError(err)
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_virtual_cluster(virtual_cluster_id):
+ boto3.client('emr-containers').delete_virtual_cluster(
+ id=virtual_cluster_id,
+ )
+
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule='@once',
+ start_date=datetime(2021, 1, 1),
+ tags=['example'],
+ catchup=False,
+) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context[ENV_ID_KEY]
+ role_arn = test_context[ROLE_ARN_KEY]
+ subnets = test_context[SUBNETS_KEY]
+ job_role_arn = test_context[JOB_ROLE_ARN_KEY]
+
+ s3_bucket_name = f'{env_id}-bucket'
+ eks_cluster_name = f'{env_id}-cluster'
+ virtual_cluster_name = f'{env_id}-virtual-cluster'
+ nodegroup_name = f'{env_id}-nodegroup'
+ eks_namespace = 'default'
+
+ # [START howto_operator_emr_eks_config]
+ job_driver_arg = {
+ 'sparkSubmitJobDriver': {
+ 'entryPoint': f's3://{s3_bucket_name}/{S3_FILE_NAME}',
+ 'sparkSubmitParameters': '--conf spark.executors.instances=2 --conf spark.executors.memory=2G '
+ '--conf spark.executor.cores=2 --conf spark.driver.cores=1',
+ }
+ }
+
+ configuration_overrides_arg = {
+ 'monitoringConfiguration': {
+ 'cloudWatchMonitoringConfiguration': {
+ 'logGroupName': '/emr-eks-jobs',
+ 'logStreamNamePrefix': 'airflow',
+ }
+ },
+ }
+ # [END howto_operator_emr_eks_config]
+
+ create_bucket = S3CreateBucketOperator(
+ task_id='create_bucket',
+ bucket_name=s3_bucket_name,
+ )
+
+ upload_s3_file = S3CreateObjectOperator(
+ task_id='upload_s3_file',
+ s3_bucket=s3_bucket_name,
+ s3_key=S3_FILE_NAME,
+ data=S3_FILE_CONTENT,
+ )
+
+ create_cluster_and_nodegroup = EksCreateClusterOperator(
+ task_id='create_cluster_and_nodegroup',
+ cluster_name=eks_cluster_name,
+ nodegroup_name=nodegroup_name,
+ cluster_role_arn=role_arn,
+ # Opting to use the same ARN for the cluster and the nodegroup here,
+ # but a different ARN could be configured and passed if desired.
+ nodegroup_role_arn=role_arn,
+ resources_vpc_config={'subnetIds': subnets},
+ )
+
+ await_create_nodegroup = EksNodegroupStateSensor(
+ task_id='await_create_nodegroup',
+ cluster_name=eks_cluster_name,
+ nodegroup_name=nodegroup_name,
+ target_state=NodegroupStates.ACTIVE,
+ )
+
+ emr_access_on_eks = enable_access_emr_on_eks(eks_cluster_name, eks_namespace)
+
+ # [START howto_operator_emr_eks_create_cluster]
+ create_emr_eks_cluster = EmrEksCreateClusterOperator(
+ task_id='create_emr_eks_cluster',
+ virtual_cluster_name=virtual_cluster_name,
+ eks_cluster_name=eks_cluster_name,
+ eks_namespace=eks_namespace,
+ )
+ # [END howto_operator_emr_eks_create_cluster]
+
+ trust_policy_update = update_trust_policy_execution_role(
+ eks_cluster_name, eks_namespace, get_execution_role_name()
+ )
+
+ # [START howto_operator_emr_container]
+ job_starter = EmrContainerOperator(
+ task_id='start_job',
+ virtual_cluster_id=str(create_emr_eks_cluster.output),
+ execution_role_arn=job_role_arn,
+ release_label='emr-6.3.0-latest',
+ job_driver=job_driver_arg,
+ configuration_overrides=configuration_overrides_arg,
+ name='pi.py',
+ )
+ # [END howto_operator_emr_container]
+ job_starter.wait_for_completion = False
+
+ # [START howto_sensor_emr_container]
+ job_waiter = EmrContainerSensor(
+ task_id='job_waiter',
+ virtual_cluster_id=str(create_emr_eks_cluster.output),
+ job_id=str(job_starter.output),
+ )
+ # [END howto_sensor_emr_container]
+
+ delete_eks_cluster = EksDeleteClusterOperator(
+ task_id='delete_eks_cluster',
+ cluster_name=eks_cluster_name,
+ force_delete_compute=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ await_delete_eks_cluster = EksClusterStateSensor(
+ task_id='await_delete_eks_cluster',
+ cluster_name=eks_cluster_name,
+ target_state=ClusterStates.NONEXISTENT,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_bucket = S3DeleteBucketOperator(
+ task_id='delete_bucket',
+ bucket_name=s3_bucket_name,
+ force_delete=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ chain(
+ # TEST SETUP
+ test_context,
+ create_bucket,
+ upload_s3_file,
+ create_cluster_and_nodegroup,
+ await_create_nodegroup,
+ emr_access_on_eks,
+ trust_policy_update,
+ # TEST BODY
+ create_emr_eks_cluster,
+ job_starter,
+ job_waiter,
+ # TEST TEARDOWN
+ delete_virtual_cluster(str(create_emr_eks_cluster.output)),
+ delete_eks_cluster,
+ await_delete_eks_cluster,
+ delete_bucket,
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)