You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2024/02/24 11:05:27 UTC

(airflow) branch main updated: Fix `example_emr` system test (#37667)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 42f6373e52 Fix `example_emr` system test (#37667)
42f6373e52 is described below

commit 42f6373e52e2f857e4c4aaa0f6fa096eb791cddc
Author: Vincent <97...@users.noreply.github.com>
AuthorDate: Sat Feb 24 06:05:19 2024 -0500

    Fix `example_emr` system test (#37667)
---
 airflow/providers/amazon/aws/operators/emr.py    | 13 ++++++-------
 airflow/providers/amazon/aws/waiters/emr.json    |  6 ++++++
 tests/system/providers/amazon/aws/example_emr.py | 17 +----------------
 3 files changed, 13 insertions(+), 23 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py
index d6bdb2e318..48eb198296 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -723,15 +723,14 @@ class EmrCreateJobFlowOperator(BaseOperator):
         job_flow_overrides: str | dict[str, Any] | None = None,
         region_name: str | None = None,
         wait_for_completion: bool = False,
-        # TODO: waiter_max_attempts and waiter_delay should default to None when the other two are deprecated.
-        waiter_max_attempts: int | None | ArgNotSet = NOTSET,
-        waiter_delay: int | None | ArgNotSet = NOTSET,
+        waiter_max_attempts: int | None = None,
+        waiter_delay: int | None = None,
         waiter_countdown: int | None = None,
         waiter_check_interval_seconds: int = 60,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
         **kwargs: Any,
     ):
-        if waiter_max_attempts is NOTSET:
+        if waiter_countdown:
             warnings.warn(
                 "The parameter waiter_countdown has been deprecated to standardize "
                 "naming conventions.  Please use waiter_max_attempts instead.  In the "
@@ -742,7 +741,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
             # waiter_countdown defaults to never timing out, which is not supported
             # by boto waiters, so we will set it here to "a very long time" for now.
             waiter_max_attempts = (waiter_countdown or 999) // waiter_check_interval_seconds
-        if waiter_delay is NOTSET:
+        if waiter_check_interval_seconds:
             warnings.warn(
                 "The parameter waiter_check_interval_seconds has been deprecated to "
                 "standardize naming conventions.  Please use waiter_delay instead.  In the "
@@ -757,8 +756,8 @@ class EmrCreateJobFlowOperator(BaseOperator):
         self.job_flow_overrides = job_flow_overrides or {}
         self.region_name = region_name
         self.wait_for_completion = wait_for_completion
-        self.waiter_max_attempts = int(waiter_max_attempts)  # type: ignore[arg-type]
-        self.waiter_delay = int(waiter_delay)  # type: ignore[arg-type]
+        self.waiter_max_attempts = waiter_max_attempts or 60
+        self.waiter_delay = waiter_delay or 30
         self.deferrable = deferrable
 
     @cached_property
diff --git a/airflow/providers/amazon/aws/waiters/emr.json b/airflow/providers/amazon/aws/waiters/emr.json
index 91c902eed6..3aed99c85c 100644
--- a/airflow/providers/amazon/aws/waiters/emr.json
+++ b/airflow/providers/amazon/aws/waiters/emr.json
@@ -62,6 +62,12 @@
                     "expected": "WAITING",
                     "state": "success"
                 },
+                {
+                    "matcher": "path",
+                    "argument": "Cluster.Status.State",
+                    "expected": "RUNNING",
+                    "state": "success"
+                },
                 {
                     "matcher": "path",
                     "argument": "Cluster.Status.State",
diff --git a/tests/system/providers/amazon/aws/example_emr.py b/tests/system/providers/amazon/aws/example_emr.py
index e131b18f3c..ffa98a2c8f 100644
--- a/tests/system/providers/amazon/aws/example_emr.py
+++ b/tests/system/providers/amazon/aws/example_emr.py
@@ -27,7 +27,6 @@ import boto3
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
-from airflow.providers.amazon.aws.hooks.ssm import SsmHook
 from airflow.providers.amazon.aws.operators.emr import (
     EmrAddStepsOperator,
     EmrCreateJobFlowOperator,
@@ -72,7 +71,7 @@ SPARK_STEPS = [
 
 JOB_FLOW_OVERRIDES: dict[str, Any] = {
     "Name": "PiCalc",
-    "ReleaseLabel": "emr-6.7.0",
+    "ReleaseLabel": "emr-7.0.0",
     "Applications": [{"Name": "Spark"}],
     "Instances": {
         "InstanceGroups": [
@@ -94,16 +93,6 @@ JOB_FLOW_OVERRIDES: dict[str, Any] = {
 # [END howto_operator_emr_steps_config]
 
 
-@task
-def get_ami_id():
-    """
-    Returns an AL2 AMI compatible with EMR
-    """
-    return SsmHook(aws_conn_id=None).get_parameter_value(
-        "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-ebs"
-    )
-
-
 @task
 def configure_security_config(config_name: str):
     boto3.client("emr").create_security_configuration(
@@ -147,7 +136,6 @@ with DAG(
 
     JOB_FLOW_OVERRIDES["LogUri"] = f"s3://{s3_bucket}/"
     JOB_FLOW_OVERRIDES["SecurityConfiguration"] = config_name
-    JOB_FLOW_OVERRIDES["Instances"]["InstanceGroups"][0]["CustomAmiId"] = get_ami_id()
 
     create_security_configuration = configure_security_config(config_name)
 
@@ -173,9 +161,6 @@ with DAG(
     )
     # [END howto_operator_emr_add_steps]
     add_steps.wait_for_completion = True
-    # On rare occasion (1 in 50ish?) this system test times out.  Extending the
-    # max_attempts from the default 60 to attempt to mitigate the flaky test.
-    add_steps.waiter_max_attempts = 90
 
     # [START howto_sensor_emr_step]
     wait_for_step = EmrStepSensor(