You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2024/02/24 11:05:27 UTC
(airflow) branch main updated: Fix `example_emr` system test (#37667)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 42f6373e52 Fix `example_emr` system test (#37667)
42f6373e52 is described below
commit 42f6373e52e2f857e4c4aaa0f6fa096eb791cddc
Author: Vincent <97...@users.noreply.github.com>
AuthorDate: Sat Feb 24 06:05:19 2024 -0500
Fix `example_emr` system test (#37667)
---
airflow/providers/amazon/aws/operators/emr.py | 13 ++++++-------
airflow/providers/amazon/aws/waiters/emr.json | 6 ++++++
tests/system/providers/amazon/aws/example_emr.py | 17 +----------------
3 files changed, 13 insertions(+), 23 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py
index d6bdb2e318..48eb198296 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -723,15 +723,14 @@ class EmrCreateJobFlowOperator(BaseOperator):
job_flow_overrides: str | dict[str, Any] | None = None,
region_name: str | None = None,
wait_for_completion: bool = False,
- # TODO: waiter_max_attempts and waiter_delay should default to None when the other two are deprecated.
- waiter_max_attempts: int | None | ArgNotSet = NOTSET,
- waiter_delay: int | None | ArgNotSet = NOTSET,
+ waiter_max_attempts: int | None = None,
+ waiter_delay: int | None = None,
waiter_countdown: int | None = None,
waiter_check_interval_seconds: int = 60,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs: Any,
):
- if waiter_max_attempts is NOTSET:
+ if waiter_countdown:
warnings.warn(
"The parameter waiter_countdown has been deprecated to standardize "
"naming conventions. Please use waiter_max_attempts instead. In the "
@@ -742,7 +741,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
# waiter_countdown defaults to never timing out, which is not supported
# by boto waiters, so we will set it here to "a very long time" for now.
waiter_max_attempts = (waiter_countdown or 999) // waiter_check_interval_seconds
- if waiter_delay is NOTSET:
+ if waiter_check_interval_seconds:
warnings.warn(
"The parameter waiter_check_interval_seconds has been deprecated to "
"standardize naming conventions. Please use waiter_delay instead. In the "
@@ -757,8 +756,8 @@ class EmrCreateJobFlowOperator(BaseOperator):
self.job_flow_overrides = job_flow_overrides or {}
self.region_name = region_name
self.wait_for_completion = wait_for_completion
- self.waiter_max_attempts = int(waiter_max_attempts) # type: ignore[arg-type]
- self.waiter_delay = int(waiter_delay) # type: ignore[arg-type]
+ self.waiter_max_attempts = waiter_max_attempts or 60
+ self.waiter_delay = waiter_delay or 30
self.deferrable = deferrable
@cached_property
diff --git a/airflow/providers/amazon/aws/waiters/emr.json b/airflow/providers/amazon/aws/waiters/emr.json
index 91c902eed6..3aed99c85c 100644
--- a/airflow/providers/amazon/aws/waiters/emr.json
+++ b/airflow/providers/amazon/aws/waiters/emr.json
@@ -62,6 +62,12 @@
"expected": "WAITING",
"state": "success"
},
+ {
+ "matcher": "path",
+ "argument": "Cluster.Status.State",
+ "expected": "RUNNING",
+ "state": "success"
+ },
{
"matcher": "path",
"argument": "Cluster.Status.State",
diff --git a/tests/system/providers/amazon/aws/example_emr.py b/tests/system/providers/amazon/aws/example_emr.py
index e131b18f3c..ffa98a2c8f 100644
--- a/tests/system/providers/amazon/aws/example_emr.py
+++ b/tests/system/providers/amazon/aws/example_emr.py
@@ -27,7 +27,6 @@ import boto3
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
-from airflow.providers.amazon.aws.hooks.ssm import SsmHook
from airflow.providers.amazon.aws.operators.emr import (
EmrAddStepsOperator,
EmrCreateJobFlowOperator,
@@ -72,7 +71,7 @@ SPARK_STEPS = [
JOB_FLOW_OVERRIDES: dict[str, Any] = {
"Name": "PiCalc",
- "ReleaseLabel": "emr-6.7.0",
+ "ReleaseLabel": "emr-7.0.0",
"Applications": [{"Name": "Spark"}],
"Instances": {
"InstanceGroups": [
@@ -94,16 +93,6 @@ JOB_FLOW_OVERRIDES: dict[str, Any] = {
# [END howto_operator_emr_steps_config]
-@task
-def get_ami_id():
- """
- Returns an AL2 AMI compatible with EMR
- """
- return SsmHook(aws_conn_id=None).get_parameter_value(
- "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-ebs"
- )
-
-
@task
def configure_security_config(config_name: str):
boto3.client("emr").create_security_configuration(
@@ -147,7 +136,6 @@ with DAG(
JOB_FLOW_OVERRIDES["LogUri"] = f"s3://{s3_bucket}/"
JOB_FLOW_OVERRIDES["SecurityConfiguration"] = config_name
- JOB_FLOW_OVERRIDES["Instances"]["InstanceGroups"][0]["CustomAmiId"] = get_ami_id()
create_security_configuration = configure_security_config(config_name)
@@ -173,9 +161,6 @@ with DAG(
)
# [END howto_operator_emr_add_steps]
add_steps.wait_for_completion = True
- # On rare occasion (1 in 50ish?) this system test times out. Extending the
- # max_attempts from the default 60 to attempt to mitigate the flaky test.
- add_steps.waiter_max_attempts = 90
# [START howto_sensor_emr_step]
wait_for_step = EmrStepSensor(