You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/06 11:28:58 UTC
[airflow] branch main updated: System test for EMR Serverless (#25559)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 33fbe75dd5 System test for EMR Serverless (#25559)
33fbe75dd5 is described below
commit 33fbe75dd5100539c697d705552b088e568d52e4
Author: syedahsn <10...@users.noreply.github.com>
AuthorDate: Sat Aug 6 05:28:49 2022 -0600
System test for EMR Serverless (#25559)
* System test for EMR Serverless following the template in #24643 (AIP-47)
* Remove example_emr_serverless.py from example_dags
---
.../operators/emr_serverless.rst | 10 +--
.../amazon/aws}/example_emr_serverless.py | 80 ++++++++++++++++------
2 files changed, 65 insertions(+), 25 deletions(-)
diff --git a/docs/apache-airflow-providers-amazon/operators/emr_serverless.rst b/docs/apache-airflow-providers-amazon/operators/emr_serverless.rst
index 2496af2c40..15b0c6de0f 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr_serverless.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr_serverless.rst
@@ -41,7 +41,7 @@ Create an EMR Serverless Application
You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessCreateApplicationOperator` to
create a new EMR Serverless Application.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_serverless_create_application]
@@ -55,7 +55,7 @@ Start an EMR Serverless Job
You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessStartJobOperator` to
start an EMR Serverless Job.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_serverless_start_job]
@@ -69,7 +69,7 @@ Delete an EMR Serverless Application
You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessDeleteApplicationOperator` to
delete an EMR Serverless Application.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_serverless_delete_application]
@@ -86,7 +86,7 @@ Wait on an EMR Serverless Job state
To monitor the state of an EMR Serverless Job you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor`.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_emr_serverless_job]
@@ -100,7 +100,7 @@ Wait on an EMR Serverless Application state
To monitor the state of an EMR Serverless Application you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrServerlessApplicationSensor`.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_emr_serverless_application]
diff --git a/airflow/providers/amazon/aws/example_dags/example_emr_serverless.py b/tests/system/providers/amazon/aws/example_emr_serverless.py
similarity index 54%
rename from airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
rename to tests/system/providers/amazon/aws/example_emr_serverless.py
index b8c0618014..0d931a752c 100644
--- a/airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
+++ b/tests/system/providers/amazon/aws/example_emr_serverless.py
@@ -15,40 +15,55 @@
# specific language governing permissions and limitations
# under the License.
+
from datetime import datetime
-from os import getenv
-from airflow import DAG
from airflow.models.baseoperator import chain
+from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.emr import (
EmrServerlessCreateApplicationOperator,
EmrServerlessDeleteApplicationOperator,
EmrServerlessStartJobOperator,
)
+from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor, EmrServerlessJobSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
+
+DAG_ID = 'example_emr_serverless'
+
+# Externally fetched variables:
+ROLE_ARN_KEY = 'ROLE_ARN'
-EXECUTION_ROLE_ARN = getenv('EXECUTION_ROLE_ARN', 'execution_role_arn')
-EMR_EXAMPLE_BUCKET = getenv('EMR_EXAMPLE_BUCKET', 'emr_example_bucket')
-SPARK_JOB_DRIVER = {
- "sparkSubmit": {
- "entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py",
- "entryPointArguments": [f"s3://{EMR_EXAMPLE_BUCKET}/output"],
- "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g\
- --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1",
- }
-}
-SPARK_CONFIGURATION_OVERRIDES = {
- "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://{EMR_EXAMPLE_BUCKET}/logs"}}
-}
+sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
with DAG(
- dag_id='example_emr_serverless',
- schedule_interval=None,
+ dag_id=DAG_ID,
+ schedule_interval='@once',
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
-) as emr_serverless_dag:
+) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context[ENV_ID_KEY]
+ role_arn = test_context[ROLE_ARN_KEY]
+ bucket_name = f'{env_id}-emr-serverless-bucket'
+ entryPoint = "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py"
+ create_s3_bucket = S3CreateBucketOperator(task_id='create_s3_bucket', bucket_name=bucket_name)
+
+ SPARK_JOB_DRIVER = {
+ "sparkSubmit": {
+ "entryPoint": entryPoint,
+ "entryPointArguments": [f"s3://{bucket_name}/output"],
+ "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g\
+ --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1",
+ }
+ }
+
+ SPARK_CONFIGURATION_OVERRIDES = {
+ "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://{bucket_name}/logs"}}
+ }
# [START howto_operator_emr_serverless_create_application]
emr_serverless_app = EmrServerlessCreateApplicationOperator(
@@ -70,7 +85,7 @@ with DAG(
start_job = EmrServerlessStartJobOperator(
task_id='start_emr_serverless_job',
application_id=emr_serverless_app.output,
- execution_role_arn=EXECUTION_ROLE_ARN,
+ execution_role_arn=role_arn,
job_driver=SPARK_JOB_DRIVER,
configuration_overrides=SPARK_CONFIGURATION_OVERRIDES,
)
@@ -84,14 +99,39 @@ with DAG(
# [START howto_operator_emr_serverless_delete_application]
delete_app = EmrServerlessDeleteApplicationOperator(
- task_id='delete_application', application_id=emr_serverless_app.output, trigger_rule="all_done"
+ task_id='delete_application',
+ application_id=emr_serverless_app.output,
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_emr_serverless_delete_application]
+ delete_s3_bucket = S3DeleteBucketOperator(
+ task_id='delete_s3_bucket',
+ bucket_name=bucket_name,
+ force_delete=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
chain(
+ # TEST SETUP
+ test_context,
+ create_s3_bucket,
+ # TEST BODY
emr_serverless_app,
wait_for_app_creation,
start_job,
wait_for_job,
+ # TEST TEARDOWN
delete_app,
+ delete_s3_bucket,
)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)