You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/06 11:28:58 UTC

[airflow] branch main updated: System test for EMR Serverless (#25559)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 33fbe75dd5 System test for EMR Serverless  (#25559)
33fbe75dd5 is described below

commit 33fbe75dd5100539c697d705552b088e568d52e4
Author: syedahsn <10...@users.noreply.github.com>
AuthorDate: Sat Aug 6 05:28:49 2022 -0600

    System test for EMR Serverless  (#25559)
    
    * System test for EMR Serverless following the template in #24643 (AIP-47)
    
    * Remove example_emr_serverless.py from example_dags
---
 .../operators/emr_serverless.rst                   | 10 +--
 .../amazon/aws}/example_emr_serverless.py          | 80 ++++++++++++++++------
 2 files changed, 65 insertions(+), 25 deletions(-)

diff --git a/docs/apache-airflow-providers-amazon/operators/emr_serverless.rst b/docs/apache-airflow-providers-amazon/operators/emr_serverless.rst
index 2496af2c40..15b0c6de0f 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr_serverless.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr_serverless.rst
@@ -41,7 +41,7 @@ Create an EMR Serverless Application
 You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessCreateApplicationOperator` to
 create a new EMR Serverless Application.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
    :language: python
    :dedent: 4
    :start-after: [START howto_operator_emr_serverless_create_application]
@@ -55,7 +55,7 @@ Start an EMR Serverless Job
 You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessStartJobOperator` to
 start an EMR Serverless Job.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
    :language: python
    :dedent: 4
    :start-after: [START howto_operator_emr_serverless_start_job]
@@ -69,7 +69,7 @@ Delete an EMR Serverless Application
 You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessDeleteApplicationOperator` to
 delete an EMR Serverless Application.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
    :language: python
    :dedent: 4
    :start-after: [START howto_operator_emr_serverless_delete_application]
@@ -86,7 +86,7 @@ Wait on an EMR Serverless Job state
 To monitor the state of an EMR Serverless Job you can use
 :class:`~airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
    :language: python
    :dedent: 4
    :start-after: [START howto_sensor_emr_serverless_job]
@@ -100,7 +100,7 @@ Wait on an EMR Serverless Application state
 To monitor the state of an EMR Serverless Application you can use
 :class:`~airflow.providers.amazon.aws.sensors.emr.EmrServerlessApplicationSensor`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
    :language: python
    :dedent: 4
    :start-after: [START howto_sensor_emr_serverless_application]
diff --git a/airflow/providers/amazon/aws/example_dags/example_emr_serverless.py b/tests/system/providers/amazon/aws/example_emr_serverless.py
similarity index 54%
rename from airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
rename to tests/system/providers/amazon/aws/example_emr_serverless.py
index b8c0618014..0d931a752c 100644
--- a/airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+++ b/tests/system/providers/amazon/aws/example_emr_serverless.py
@@ -15,40 +15,55 @@
 # specific language governing permissions and limitations
 # under the License.
 
+
 from datetime import datetime
-from os import getenv
 
-from airflow import DAG
 from airflow.models.baseoperator import chain
+from airflow.models.dag import DAG
 from airflow.providers.amazon.aws.operators.emr import (
     EmrServerlessCreateApplicationOperator,
     EmrServerlessDeleteApplicationOperator,
     EmrServerlessStartJobOperator,
 )
+from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
 from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor, EmrServerlessJobSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+DAG_ID = 'example_emr_serverless'
+
+# Externally fetched variables:
+ROLE_ARN_KEY = 'ROLE_ARN'
 
-EXECUTION_ROLE_ARN = getenv('EXECUTION_ROLE_ARN', 'execution_role_arn')
-EMR_EXAMPLE_BUCKET = getenv('EMR_EXAMPLE_BUCKET', 'emr_example_bucket')
-SPARK_JOB_DRIVER = {
-    "sparkSubmit": {
-        "entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py",
-        "entryPointArguments": [f"s3://{EMR_EXAMPLE_BUCKET}/output"],
-        "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g\
-            --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1",
-    }
-}
 
-SPARK_CONFIGURATION_OVERRIDES = {
-    "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://{EMR_EXAMPLE_BUCKET}/logs"}}
-}
+sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
 
 with DAG(
-    dag_id='example_emr_serverless',
-    schedule_interval=None,
+    dag_id=DAG_ID,
+    schedule_interval='@once',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
-) as emr_serverless_dag:
+) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context[ENV_ID_KEY]
+    role_arn = test_context[ROLE_ARN_KEY]
+    bucket_name = f'{env_id}-emr-serverless-bucket'
+    entryPoint = "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py"
+    create_s3_bucket = S3CreateBucketOperator(task_id='create_s3_bucket', bucket_name=bucket_name)
+
+    SPARK_JOB_DRIVER = {
+        "sparkSubmit": {
+            "entryPoint": entryPoint,
+            "entryPointArguments": [f"s3://{bucket_name}/output"],
+            "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g\
+                --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1",
+        }
+    }
+
+    SPARK_CONFIGURATION_OVERRIDES = {
+        "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://{bucket_name}/logs"}}
+    }
 
     # [START howto_operator_emr_serverless_create_application]
     emr_serverless_app = EmrServerlessCreateApplicationOperator(
@@ -70,7 +85,7 @@ with DAG(
     start_job = EmrServerlessStartJobOperator(
         task_id='start_emr_serverless_job',
         application_id=emr_serverless_app.output,
-        execution_role_arn=EXECUTION_ROLE_ARN,
+        execution_role_arn=role_arn,
         job_driver=SPARK_JOB_DRIVER,
         configuration_overrides=SPARK_CONFIGURATION_OVERRIDES,
     )
@@ -84,14 +99,39 @@ with DAG(
 
     # [START howto_operator_emr_serverless_delete_application]
     delete_app = EmrServerlessDeleteApplicationOperator(
-        task_id='delete_application', application_id=emr_serverless_app.output, trigger_rule="all_done"
+        task_id='delete_application',
+        application_id=emr_serverless_app.output,
+        trigger_rule=TriggerRule.ALL_DONE,
     )
     # [END howto_operator_emr_serverless_delete_application]
 
+    delete_s3_bucket = S3DeleteBucketOperator(
+        task_id='delete_s3_bucket',
+        bucket_name=bucket_name,
+        force_delete=True,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
     chain(
+        # TEST SETUP
+        test_context,
+        create_s3_bucket,
+        # TEST BODY
         emr_serverless_app,
         wait_for_app_creation,
         start_job,
         wait_for_job,
+        # TEST TEARDOWN
         delete_app,
+        delete_s3_bucket,
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)