You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/10 06:09:30 UTC

[airflow] branch main updated: Convert emr_eks example dag to system test (#26723)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 62d5bab3b4 Convert emr_eks example dag to system test (#26723)
62d5bab3b4 is described below

commit 62d5bab3b4cdb423c668b0f2e29d5c52b8ca0ca4
Author: Vincent <97...@users.noreply.github.com>
AuthorDate: Mon Oct 10 02:09:22 2022 -0400

    Convert emr_eks example dag to system test (#26723)
---
 .../amazon/aws/example_dags/example_emr_eks.py     |  91 -------
 .../operators/emr_eks.rst                          |   9 +-
 docs/spelling_wordlist.txt                         |   1 +
 .../system/providers/amazon/aws/example_emr_eks.py | 282 +++++++++++++++++++++
 4 files changed, 288 insertions(+), 95 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_emr_eks.py b/airflow/providers/amazon/aws/example_dags/example_emr_eks.py
deleted file mode 100644
index d827938985..0000000000
--- a/airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow import DAG
-from airflow.models.baseoperator import chain
-from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator, EmrEksCreateClusterOperator
-from airflow.providers.amazon.aws.sensors.emr import EmrContainerSensor
-
-JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role")
-
-# [START howto_operator_emr_eks_config]
-JOB_DRIVER_ARG = {
-    "sparkSubmitJobDriver": {
-        "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
-        "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1",  # noqa: E501
-    }
-}
-
-CONFIGURATION_OVERRIDES_ARG = {
-    "applicationConfiguration": [
-        {
-            "classification": "spark-defaults",
-            "properties": {
-                "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",  # noqa: E501
-            },
-        }
-    ],
-    "monitoringConfiguration": {
-        "cloudWatchMonitoringConfiguration": {
-            "logGroupName": "/aws/emr-eks-spark",
-            "logStreamNamePrefix": "airflow",
-        }
-    },
-}
-# [END howto_operator_emr_eks_config]
-
-with DAG(
-    dag_id='example_emr_eks',
-    start_date=datetime(2021, 1, 1),
-    tags=['example'],
-    catchup=False,
-) as dag:
-    # [START howto_operator_emr_eks_create_cluster]
-    create_emr_eks_cluster = EmrEksCreateClusterOperator(
-        task_id="create_emr_eks_cluster",
-        virtual_cluster_name="emr_eks_virtual_cluster",
-        eks_cluster_name="eks_cluster",
-        eks_namespace="eks_namespace",
-    )
-    # [END howto_operator_emr_eks_create_cluster]
-
-    # [START howto_operator_emr_container]
-    job_starter = EmrContainerOperator(
-        task_id="start_job",
-        virtual_cluster_id=str(create_emr_eks_cluster.output),
-        execution_role_arn=JOB_ROLE_ARN,
-        release_label="emr-6.3.0-latest",
-        job_driver=JOB_DRIVER_ARG,
-        configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
-        name="pi.py",
-        wait_for_completion=False,
-    )
-    # [END howto_operator_emr_container]
-
-    # [START howto_sensor_emr_container]
-    job_waiter = EmrContainerSensor(
-        task_id="job_waiter",
-        virtual_cluster_id=str(create_emr_eks_cluster.output),
-        job_id=str(job_starter.output),
-    )
-    # [END howto_sensor_emr_container]
-
-    chain(job_starter, job_waiter)
diff --git a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
index 1116aa6be6..ee2ee50ec1 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
@@ -48,7 +48,7 @@ the eks cluster that you would like to use , and an eks namespace.
 Refer to the `EMR on EKS Development guide <https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/virtual-cluster.html>`__
 for more details.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_eks.py
     :language: python
     :start-after: [START howto_operator_emr_eks_create_cluster]
     :end-before: [END howto_operator_emr_eks_create_cluster]
@@ -81,8 +81,9 @@ and ``monitoringConfiguration`` to send logs to the ``/aws/emr-eks-spark`` log g
 Refer to the `EMR on EKS guide <https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/emr-eks-jobs-CLI.html#emr-eks-jobs-parameters>`__
 for more details on job configuration.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_eks.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_emr_eks_config]
     :end-before: [END howto_operator_emr_eks_config]
 
@@ -91,7 +92,7 @@ can store them in a connection or provide them in the DAG. Your AWS region shoul
 in the ``aws_default`` connection as ``{"region_name": "us-east-1"}`` or a custom connection name
 that gets passed to the operator with the ``aws_conn_id`` parameter. The operator returns the Job ID of the job run.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_eks.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_emr_container]
@@ -109,7 +110,7 @@ Wait on an Amazon EMR virtual cluster job
 To wait on the status of an Amazon EMR virtual cluster job to reach a terminal state, you can use
 :class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_eks.py
     :language: python
     :dedent: 4
     :start-after: [START howto_sensor_emr_container]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 06ec96fd35..8d8ba0149f 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -956,6 +956,7 @@ nosasl
 NotFound
 notificationChannels
 npm
+ns
 ntlm
 ntpd
 Nullable
diff --git a/tests/system/providers/amazon/aws/example_emr_eks.py b/tests/system/providers/amazon/aws/example_emr_eks.py
new file mode 100644
index 0000000000..b10dce7c2c
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_emr_eks.py
@@ -0,0 +1,282 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import subprocess
+from datetime import datetime
+
+import boto3
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates
+from airflow.providers.amazon.aws.operators.eks import EksCreateClusterOperator, EksDeleteClusterOperator
+from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator, EmrEksCreateClusterOperator
+from airflow.providers.amazon.aws.operators.s3 import (
+    S3CreateBucketOperator,
+    S3CreateObjectOperator,
+    S3DeleteBucketOperator,
+)
+from airflow.providers.amazon.aws.sensors.eks import EksClusterStateSensor, EksNodegroupStateSensor
+from airflow.providers.amazon.aws.sensors.emr import EmrContainerSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+DAG_ID = 'example_emr_eks'
+
+# Externally fetched variables
+ROLE_ARN_KEY = 'ROLE_ARN'
+JOB_ROLE_ARN_KEY = 'JOB_ROLE_ARN'
+SUBNETS_KEY = 'SUBNETS'
+
+sys_test_context_task = (
+    SystemTestContextBuilder()
+    .add_variable(ROLE_ARN_KEY)
+    .add_variable(JOB_ROLE_ARN_KEY)
+    .add_variable(SUBNETS_KEY, split_string=True)
+    .build()
+)
+
+S3_FILE_NAME = 'pi.py'
+S3_FILE_CONTENT = '''
+k = 1
+s = 0
+
+for i in range(1000000):
+    if i % 2 == 0:
+        s += 4/k
+    else:
+        s -= 4/k
+
+    k += 2
+
+print(s)
+'''
+
+
+@task
+def enable_access_emr_on_eks(cluster, ns):
+    # Install eksctl and enable access for EMR on EKS
+    # See https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-cluster-access.html
+    file = 'https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz'
+    commands = f"""
+        curl --silent --location "{file}" | tar xz -C /tmp &&
+        sudo mv /tmp/eksctl /usr/local/bin &&
+        eksctl create iamidentitymapping --cluster {cluster} --namespace {ns} --service-name "emr-containers"
+    """
+
+    build = subprocess.Popen(
+        commands,
+        shell=True,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+    )
+    _, err = build.communicate()
+
+    if build.returncode != 0:
+        raise RuntimeError(err)
+
+
+@task
+def get_execution_role_name() -> str:
+    return boto3.client('sts').get_caller_identity()['Arn'].split('/')[-1]
+
+
+@task
+def update_trust_policy_execution_role(cluster_name, cluster_namespace, role_name):
+    # See https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-trust-policy.html
+    # The action "update-role-trust-policy" is not available in boto3, thus we need to do it using AWS CLI
+    commands = (
+        f'aws emr-containers update-role-trust-policy --cluster-name {cluster_name} '
+        f'--namespace {cluster_namespace} --role-name {role_name}'
+    )
+
+    build = subprocess.Popen(
+        commands,
+        shell=True,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+    )
+    _, err = build.communicate()
+
+    if build.returncode != 0:
+        raise RuntimeError(err)
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_virtual_cluster(virtual_cluster_id):
+    boto3.client('emr-containers').delete_virtual_cluster(
+        id=virtual_cluster_id,
+    )
+
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule='@once',
+    start_date=datetime(2021, 1, 1),
+    tags=['example'],
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context[ENV_ID_KEY]
+    role_arn = test_context[ROLE_ARN_KEY]
+    subnets = test_context[SUBNETS_KEY]
+    job_role_arn = test_context[JOB_ROLE_ARN_KEY]
+
+    s3_bucket_name = f'{env_id}-bucket'
+    eks_cluster_name = f'{env_id}-cluster'
+    virtual_cluster_name = f'{env_id}-virtual-cluster'
+    nodegroup_name = f'{env_id}-nodegroup'
+    eks_namespace = 'default'
+
+    # [START howto_operator_emr_eks_config]
+    job_driver_arg = {
+        'sparkSubmitJobDriver': {
+            'entryPoint': f's3://{s3_bucket_name}/{S3_FILE_NAME}',
+            'sparkSubmitParameters': '--conf spark.executors.instances=2 --conf spark.executors.memory=2G '
+            '--conf spark.executor.cores=2 --conf spark.driver.cores=1',
+        }
+    }
+
+    configuration_overrides_arg = {
+        'monitoringConfiguration': {
+            'cloudWatchMonitoringConfiguration': {
+                'logGroupName': '/emr-eks-jobs',
+                'logStreamNamePrefix': 'airflow',
+            }
+        },
+    }
+    # [END howto_operator_emr_eks_config]
+
+    create_bucket = S3CreateBucketOperator(
+        task_id='create_bucket',
+        bucket_name=s3_bucket_name,
+    )
+
+    upload_s3_file = S3CreateObjectOperator(
+        task_id='upload_s3_file',
+        s3_bucket=s3_bucket_name,
+        s3_key=S3_FILE_NAME,
+        data=S3_FILE_CONTENT,
+    )
+
+    create_cluster_and_nodegroup = EksCreateClusterOperator(
+        task_id='create_cluster_and_nodegroup',
+        cluster_name=eks_cluster_name,
+        nodegroup_name=nodegroup_name,
+        cluster_role_arn=role_arn,
+        # Opting to use the same ARN for the cluster and the nodegroup here,
+        # but a different ARN could be configured and passed if desired.
+        nodegroup_role_arn=role_arn,
+        resources_vpc_config={'subnetIds': subnets},
+    )
+
+    await_create_nodegroup = EksNodegroupStateSensor(
+        task_id='await_create_nodegroup',
+        cluster_name=eks_cluster_name,
+        nodegroup_name=nodegroup_name,
+        target_state=NodegroupStates.ACTIVE,
+    )
+
+    emr_access_on_eks = enable_access_emr_on_eks(eks_cluster_name, eks_namespace)
+
+    # [START howto_operator_emr_eks_create_cluster]
+    create_emr_eks_cluster = EmrEksCreateClusterOperator(
+        task_id='create_emr_eks_cluster',
+        virtual_cluster_name=virtual_cluster_name,
+        eks_cluster_name=eks_cluster_name,
+        eks_namespace=eks_namespace,
+    )
+    # [END howto_operator_emr_eks_create_cluster]
+
+    trust_policy_update = update_trust_policy_execution_role(
+        eks_cluster_name, eks_namespace, get_execution_role_name()
+    )
+
+    # [START howto_operator_emr_container]
+    job_starter = EmrContainerOperator(
+        task_id='start_job',
+        virtual_cluster_id=str(create_emr_eks_cluster.output),
+        execution_role_arn=job_role_arn,
+        release_label='emr-6.3.0-latest',
+        job_driver=job_driver_arg,
+        configuration_overrides=configuration_overrides_arg,
+        name='pi.py',
+    )
+    # [END howto_operator_emr_container]
+    job_starter.wait_for_completion = False
+
+    # [START howto_sensor_emr_container]
+    job_waiter = EmrContainerSensor(
+        task_id='job_waiter',
+        virtual_cluster_id=str(create_emr_eks_cluster.output),
+        job_id=str(job_starter.output),
+    )
+    # [END howto_sensor_emr_container]
+
+    delete_eks_cluster = EksDeleteClusterOperator(
+        task_id='delete_eks_cluster',
+        cluster_name=eks_cluster_name,
+        force_delete_compute=True,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    await_delete_eks_cluster = EksClusterStateSensor(
+        task_id='await_delete_eks_cluster',
+        cluster_name=eks_cluster_name,
+        target_state=ClusterStates.NONEXISTENT,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_bucket = S3DeleteBucketOperator(
+        task_id='delete_bucket',
+        bucket_name=s3_bucket_name,
+        force_delete=True,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    chain(
+        # TEST SETUP
+        test_context,
+        create_bucket,
+        upload_s3_file,
+        create_cluster_and_nodegroup,
+        await_create_nodegroup,
+        emr_access_on_eks,
+        trust_policy_update,
+        # TEST BODY
+        create_emr_eks_cluster,
+        job_starter,
+        job_waiter,
+        # TEST TEARDOWN
+        delete_virtual_cluster(str(create_emr_eks_cluster.output)),
+        delete_eks_cluster,
+        await_delete_eks_cluster,
+        delete_bucket,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)