You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/10 12:28:39 UTC

[GitHub] [airflow] BasPH opened a new pull request, #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

BasPH opened a new pull request, #28827:
URL: https://github.com/apache/airflow/pull/28827

   This PR adds the option to wait for completion with the EmrCreateJobFlowOperator. It includes:
   
   * The implementation of a `waiter` to wait for WAITING or TERMINATED state.
   * A few consistency fixes here and there. AWS writes "job flow" (lowercase) so I adhered to that.
   * Also implemented an `on_kill` method. Moved EMR hook creation & caching to a `cached_property` because the hook is called from multiple methods.
   * I don't think a default max. runtime of 25 minutes is desirable, I've had many projects which ran EMR jobs longer than that. I allowed `None` for the `waiter.countdown` argument type to represent waiting for infinity (or until the Airflow task times out based on `execution_timeout`). Defaulted the `EmrCreateJobFlowOperator.waiter_countdown` to `None`. Checked all other usages of `waiter_countdown` and they all default to `25 * 60` so no breaking changes.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28827:
URL: https://github.com/apache/airflow/pull/28827#discussion_r1066269743


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -538,42 +544,76 @@ def __init__(
         emr_conn_id: str | None = "emr_default",
         job_flow_overrides: str | dict[str, Any] | None = None,
         region_name: str | None = None,
+        wait_for_completion: bool = False,
+        waiter_countdown: int | None = None,
+        waiter_check_interval_seconds: int = 60,
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.aws_conn_id = aws_conn_id
         self.emr_conn_id = emr_conn_id
         self.job_flow_overrides = job_flow_overrides or {}
         self.region_name = region_name
+        self.wait_for_completion = wait_for_completion
+        self.waiter_countdown = waiter_countdown
+        self.waiter_check_interval_seconds = waiter_check_interval_seconds
+
+        self._job_flow_id: str | None = None
 
-    def execute(self, context: Context) -> str:
-        emr = EmrHook(
+    @cached_property
+    def _emr_hook(self) -> EmrHook:
+        """Create and return an EmrHook."""
+        return EmrHook(
             aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id, region_name=self.region_name
         )
 
+    def execute(self, context: Context) -> str | None:
         self.log.info(
-            "Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s", self.aws_conn_id, self.emr_conn_id
+            "Creating job flow using aws_conn_id: %s, emr_conn_id: %s", self.aws_conn_id, self.emr_conn_id
         )
         if isinstance(self.job_flow_overrides, str):
             job_flow_overrides: dict[str, Any] = ast.literal_eval(self.job_flow_overrides)
             self.job_flow_overrides = job_flow_overrides
         else:
             job_flow_overrides = self.job_flow_overrides
-        response = emr.create_job_flow(job_flow_overrides)
+        response = self._emr_hook.create_job_flow(job_flow_overrides)
 
         if not response["ResponseMetadata"]["HTTPStatusCode"] == 200:
-            raise AirflowException(f"JobFlow creation failed: {response}")
+            raise AirflowException(f"Job flow creation failed: {response}")
         else:
-            job_flow_id = response["JobFlowId"]
-            self.log.info("JobFlow with id %s created", job_flow_id)
+            self._job_flow_id = response["JobFlowId"]
+            self.log.info("Job flow with id %s created", self._job_flow_id)
             EmrClusterLink.persist(
                 context=context,
                 operator=self,
-                region_name=emr.conn_region_name,
-                aws_partition=emr.conn_partition,
-                job_flow_id=job_flow_id,
+                region_name=self._emr_hook.conn_region_name,
+                aws_partition=self._emr_hook.conn_partition,
+                job_flow_id=self._job_flow_id,
             )
-            return job_flow_id
+
+            if self.wait_for_completion:
+                # Didn't use a boto-supplied waiter because those don't support waiting for WAITING state.
+                # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#waiters
+                waiter(
+                    get_state_callable=self._emr_hook.get_conn().describe_cluster,

Review Comment:
   Glad to see someone using this already to create new customer waiters! :star_struck: 



##########
airflow/providers/amazon/aws/utils/waiter.py:
##########
@@ -60,14 +60,20 @@ def waiter(
             break
         if state in failure_states:
             raise AirflowException(f"{object_type.title()} reached failure state {state}.")
-        if countdown > check_interval_seconds:
-            countdown -= check_interval_seconds
+
+        if countdown is None:

Review Comment:
   I like the new feature! Thanks for adding it :smile: 
   
   If `countdown` is `None` you could set it to `float('inf')` before the while loop and then the rest of the code would function as it was before and you would not need the extra branching statements or the duplicated logs/sleeps (you'd need to add `float` as a possible type for the param as well).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #28827:
URL: https://github.com/apache/airflow/pull/28827#discussion_r1086224824


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -538,42 +544,76 @@ def __init__(
         emr_conn_id: str | None = "emr_default",
         job_flow_overrides: str | dict[str, Any] | None = None,
         region_name: str | None = None,
+        wait_for_completion: bool = False,
+        waiter_countdown: int | None = None,
+        waiter_check_interval_seconds: int = 60,
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.aws_conn_id = aws_conn_id
         self.emr_conn_id = emr_conn_id
         self.job_flow_overrides = job_flow_overrides or {}
         self.region_name = region_name
+        self.wait_for_completion = wait_for_completion
+        self.waiter_countdown = waiter_countdown
+        self.waiter_check_interval_seconds = waiter_check_interval_seconds
+
+        self._job_flow_id: str | None = None
 
-    def execute(self, context: Context) -> str:
-        emr = EmrHook(
+    @cached_property
+    def _emr_hook(self) -> EmrHook:
+        """Create and return an EmrHook."""
+        return EmrHook(
             aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id, region_name=self.region_name
         )
 
+    def execute(self, context: Context) -> str | None:
         self.log.info(
-            "Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s", self.aws_conn_id, self.emr_conn_id
+            "Creating job flow using aws_conn_id: %s, emr_conn_id: %s", self.aws_conn_id, self.emr_conn_id
         )
         if isinstance(self.job_flow_overrides, str):
             job_flow_overrides: dict[str, Any] = ast.literal_eval(self.job_flow_overrides)
             self.job_flow_overrides = job_flow_overrides
         else:
             job_flow_overrides = self.job_flow_overrides
-        response = emr.create_job_flow(job_flow_overrides)
+        response = self._emr_hook.create_job_flow(job_flow_overrides)
 
         if not response["ResponseMetadata"]["HTTPStatusCode"] == 200:
-            raise AirflowException(f"JobFlow creation failed: {response}")
+            raise AirflowException(f"Job flow creation failed: {response}")
         else:
-            job_flow_id = response["JobFlowId"]
-            self.log.info("JobFlow with id %s created", job_flow_id)
+            self._job_flow_id = response["JobFlowId"]
+            self.log.info("Job flow with id %s created", self._job_flow_id)
             EmrClusterLink.persist(
                 context=context,
                 operator=self,
-                region_name=emr.conn_region_name,
-                aws_partition=emr.conn_partition,
-                job_flow_id=job_flow_id,
+                region_name=self._emr_hook.conn_region_name,
+                aws_partition=self._emr_hook.conn_partition,
+                job_flow_id=self._job_flow_id,
             )
-            return job_flow_id
+
+            if self.wait_for_completion:
+                # Didn't use a boto-supplied waiter because those don't support waiting for WAITING state.
+                # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#waiters
+                waiter(
+                    get_state_callable=self._emr_hook.get_conn().describe_cluster,
+                    get_state_args={"ClusterId": self._job_flow_id},
+                    parse_response=["Cluster", "Status", "State"],
+                    # Cluster will be in WAITING after finishing if KeepJobFlowAliveWhenNoSteps is True
+                    desired_state={"WAITING", "TERMINATED"},
+                    failure_states={"TERMINATED_WITH_ERRORS"},
+                    object_type="job flow",
+                    action="finished",
+                    countdown=self.waiter_countdown,
+                    check_interval_seconds=self.waiter_check_interval_seconds,
+                )
+
+            return self._job_flow_id
+
+    def on_kill(self) -> None:
+        """Terminate job flow."""
+        if self._job_flow_id:
+            self.log.info("Terminating job flow %s", self._job_flow_id)
+            self._emr_hook.terminate_job_flow(self._job_flow_id)

Review Comment:
   Hey @BasPH, thoughts on this suggested change? Otherwise the PR looks good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28827:
URL: https://github.com/apache/airflow/pull/28827#discussion_r1066407359


##########
airflow/providers/amazon/aws/utils/waiter.py:
##########
@@ -60,14 +60,20 @@ def waiter(
             break
         if state in failure_states:
             raise AirflowException(f"{object_type.title()} reached failure state {state}.")
-        if countdown > check_interval_seconds:
-            countdown -= check_interval_seconds
+
+        if countdown is None:

Review Comment:
   > Small suggestion though, I think defaulting the waiter countdown to infinity would be a better default. 25 minutes is such an arbitrary number, plus there's already task.execution_timeout and I don't see the need for yet another limit.
   
   Yeah, fundamentally I agree with you! But also it's a tiny bit of backcompat at this point. There could be folks who are depending on this timeout, and their workflows may start to fail (or start to unexpectedly pass haha) if that default switches to infinite. Also all boto waiters get the default timeouts from the service teams, and I don't usually see them as being infinite (though there are lots I haven't seen of course!), so it'd also be a little bit of an unusual behaviour for a waiter.
   But these are small concerns, if you feel strongly about the new default, I'm happy to commit to it :+1: 
   
   > So instead of requiring a user to configure countdown=float("inf")
   
   In either approach I think we should allow None and then cast that to inf inside the waiter for convenience.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #28827:
URL: https://github.com/apache/airflow/pull/28827#discussion_r1068352775


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -169,6 +169,15 @@ def add_job_flow_steps(
                 )
         return response["StepIds"]
 
+    def terminate_job_flow(self, job_flow_id: str) -> None:

Review Comment:
   We generally try to avoid functions in hooks which just wrap boto3 api. You can call the boto3 api directly from the operator



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -538,42 +544,76 @@ def __init__(
         emr_conn_id: str | None = "emr_default",
         job_flow_overrides: str | dict[str, Any] | None = None,
         region_name: str | None = None,
+        wait_for_completion: bool = False,
+        waiter_countdown: int | None = None,
+        waiter_check_interval_seconds: int = 60,
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.aws_conn_id = aws_conn_id
         self.emr_conn_id = emr_conn_id
         self.job_flow_overrides = job_flow_overrides or {}
         self.region_name = region_name
+        self.wait_for_completion = wait_for_completion
+        self.waiter_countdown = waiter_countdown
+        self.waiter_check_interval_seconds = waiter_check_interval_seconds
+
+        self._job_flow_id: str | None = None
 
-    def execute(self, context: Context) -> str:
-        emr = EmrHook(
+    @cached_property
+    def _emr_hook(self) -> EmrHook:
+        """Create and return an EmrHook."""
+        return EmrHook(
             aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id, region_name=self.region_name
         )
 
+    def execute(self, context: Context) -> str | None:
         self.log.info(
-            "Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s", self.aws_conn_id, self.emr_conn_id
+            "Creating job flow using aws_conn_id: %s, emr_conn_id: %s", self.aws_conn_id, self.emr_conn_id
         )
         if isinstance(self.job_flow_overrides, str):
             job_flow_overrides: dict[str, Any] = ast.literal_eval(self.job_flow_overrides)
             self.job_flow_overrides = job_flow_overrides
         else:
             job_flow_overrides = self.job_flow_overrides
-        response = emr.create_job_flow(job_flow_overrides)
+        response = self._emr_hook.create_job_flow(job_flow_overrides)
 
         if not response["ResponseMetadata"]["HTTPStatusCode"] == 200:
-            raise AirflowException(f"JobFlow creation failed: {response}")
+            raise AirflowException(f"Job flow creation failed: {response}")
         else:
-            job_flow_id = response["JobFlowId"]
-            self.log.info("JobFlow with id %s created", job_flow_id)
+            self._job_flow_id = response["JobFlowId"]
+            self.log.info("Job flow with id %s created", self._job_flow_id)
             EmrClusterLink.persist(
                 context=context,
                 operator=self,
-                region_name=emr.conn_region_name,
-                aws_partition=emr.conn_partition,
-                job_flow_id=job_flow_id,
+                region_name=self._emr_hook.conn_region_name,
+                aws_partition=self._emr_hook.conn_partition,
+                job_flow_id=self._job_flow_id,
             )
-            return job_flow_id
+
+            if self.wait_for_completion:
+                # Didn't use a boto-supplied waiter because those don't support waiting for WAITING state.
+                # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#waiters
+                waiter(
+                    get_state_callable=self._emr_hook.get_conn().describe_cluster,
+                    get_state_args={"ClusterId": self._job_flow_id},
+                    parse_response=["Cluster", "Status", "State"],
+                    # Cluster will be in WAITING after finishing if KeepJobFlowAliveWhenNoSteps is True
+                    desired_state={"WAITING", "TERMINATED"},
+                    failure_states={"TERMINATED_WITH_ERRORS"},
+                    object_type="job flow",
+                    action="finished",
+                    countdown=self.waiter_countdown,
+                    check_interval_seconds=self.waiter_check_interval_seconds,
+                )
+
+            return self._job_flow_id
+
+    def on_kill(self) -> None:
+        """Terminate job flow."""
+        if self._job_flow_id:
+            self.log.info("Terminating job flow %s", self._job_flow_id)
+            self._emr_hook.terminate_job_flow(self._job_flow_id)

Review Comment:
   ```suggestion
               self._emr_hook.get_conn().terminate_job_flows(JobFlowIds=[job_flow_id])



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] BasPH commented on a diff in pull request #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

Posted by GitBox <gi...@apache.org>.
BasPH commented on code in PR #28827:
URL: https://github.com/apache/airflow/pull/28827#discussion_r1066367905


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -538,42 +544,76 @@ def __init__(
         emr_conn_id: str | None = "emr_default",
         job_flow_overrides: str | dict[str, Any] | None = None,
         region_name: str | None = None,
+        wait_for_completion: bool = False,
+        waiter_countdown: int | None = None,
+        waiter_check_interval_seconds: int = 60,
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.aws_conn_id = aws_conn_id
         self.emr_conn_id = emr_conn_id
         self.job_flow_overrides = job_flow_overrides or {}
         self.region_name = region_name
+        self.wait_for_completion = wait_for_completion
+        self.waiter_countdown = waiter_countdown
+        self.waiter_check_interval_seconds = waiter_check_interval_seconds
+
+        self._job_flow_id: str | None = None
 
-    def execute(self, context: Context) -> str:
-        emr = EmrHook(
+    @cached_property
+    def _emr_hook(self) -> EmrHook:
+        """Create and return an EmrHook."""
+        return EmrHook(
             aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id, region_name=self.region_name
         )
 
+    def execute(self, context: Context) -> str | None:
         self.log.info(
-            "Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s", self.aws_conn_id, self.emr_conn_id
+            "Creating job flow using aws_conn_id: %s, emr_conn_id: %s", self.aws_conn_id, self.emr_conn_id
         )
         if isinstance(self.job_flow_overrides, str):
             job_flow_overrides: dict[str, Any] = ast.literal_eval(self.job_flow_overrides)
             self.job_flow_overrides = job_flow_overrides
         else:
             job_flow_overrides = self.job_flow_overrides
-        response = emr.create_job_flow(job_flow_overrides)
+        response = self._emr_hook.create_job_flow(job_flow_overrides)
 
         if not response["ResponseMetadata"]["HTTPStatusCode"] == 200:
-            raise AirflowException(f"JobFlow creation failed: {response}")
+            raise AirflowException(f"Job flow creation failed: {response}")
         else:
-            job_flow_id = response["JobFlowId"]
-            self.log.info("JobFlow with id %s created", job_flow_id)
+            self._job_flow_id = response["JobFlowId"]
+            self.log.info("Job flow with id %s created", self._job_flow_id)
             EmrClusterLink.persist(
                 context=context,
                 operator=self,
-                region_name=emr.conn_region_name,
-                aws_partition=emr.conn_partition,
-                job_flow_id=job_flow_id,
+                region_name=self._emr_hook.conn_region_name,
+                aws_partition=self._emr_hook.conn_partition,
+                job_flow_id=self._job_flow_id,
             )
-            return job_flow_id
+
+            if self.wait_for_completion:
+                # Didn't use a boto-supplied waiter because those don't support waiting for WAITING state.
+                # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#waiters
+                waiter(
+                    get_state_callable=self._emr_hook.get_conn().describe_cluster,

Review Comment:
   Oh I didn't realize you could actually implement custom waiters that way, hence my comment above: https://github.com/BasPH/airflow/blob/add-emrcreatejobflow-waitforcompletion/airflow/providers/amazon/aws/operators/emr.py#L595-L596.
   
   I'll take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on pull request #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on PR #28827:
URL: https://github.com/apache/airflow/pull/28827#issuecomment-1409396411

   There hasn't been much movement on this one. The remaining comments are minor and we can always circle back on them. I'm going to merge this as is. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas merged pull request #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas merged PR #28827:
URL: https://github.com/apache/airflow/pull/28827


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28827:
URL: https://github.com/apache/airflow/pull/28827#discussion_r1066357512


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -538,42 +544,76 @@ def __init__(
         emr_conn_id: str | None = "emr_default",
         job_flow_overrides: str | dict[str, Any] | None = None,
         region_name: str | None = None,
+        wait_for_completion: bool = False,
+        waiter_countdown: int | None = None,
+        waiter_check_interval_seconds: int = 60,
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.aws_conn_id = aws_conn_id
         self.emr_conn_id = emr_conn_id
         self.job_flow_overrides = job_flow_overrides or {}
         self.region_name = region_name
+        self.wait_for_completion = wait_for_completion
+        self.waiter_countdown = waiter_countdown
+        self.waiter_check_interval_seconds = waiter_check_interval_seconds
+
+        self._job_flow_id: str | None = None
 
-    def execute(self, context: Context) -> str:
-        emr = EmrHook(
+    @cached_property
+    def _emr_hook(self) -> EmrHook:
+        """Create and return an EmrHook."""
+        return EmrHook(
             aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id, region_name=self.region_name
         )
 
+    def execute(self, context: Context) -> str | None:
         self.log.info(
-            "Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s", self.aws_conn_id, self.emr_conn_id
+            "Creating job flow using aws_conn_id: %s, emr_conn_id: %s", self.aws_conn_id, self.emr_conn_id
         )
         if isinstance(self.job_flow_overrides, str):
             job_flow_overrides: dict[str, Any] = ast.literal_eval(self.job_flow_overrides)
             self.job_flow_overrides = job_flow_overrides
         else:
             job_flow_overrides = self.job_flow_overrides
-        response = emr.create_job_flow(job_flow_overrides)
+        response = self._emr_hook.create_job_flow(job_flow_overrides)
 
         if not response["ResponseMetadata"]["HTTPStatusCode"] == 200:
-            raise AirflowException(f"JobFlow creation failed: {response}")
+            raise AirflowException(f"Job flow creation failed: {response}")
         else:
-            job_flow_id = response["JobFlowId"]
-            self.log.info("JobFlow with id %s created", job_flow_id)
+            self._job_flow_id = response["JobFlowId"]
+            self.log.info("Job flow with id %s created", self._job_flow_id)
             EmrClusterLink.persist(
                 context=context,
                 operator=self,
-                region_name=emr.conn_region_name,
-                aws_partition=emr.conn_partition,
-                job_flow_id=job_flow_id,
+                region_name=self._emr_hook.conn_region_name,
+                aws_partition=self._emr_hook.conn_partition,
+                job_flow_id=self._job_flow_id,
             )
-            return job_flow_id
+
+            if self.wait_for_completion:
+                # Didn't use a boto-supplied waiter because those don't support waiting for WAITING state.
+                # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#waiters
+                waiter(
+                    get_state_callable=self._emr_hook.get_conn().describe_cluster,

Review Comment:
   Actually, this isn't the waiter setup I thought it was originally (thanks @ferruzzi for pointing that out!). You can find details on the new custom waiters [here](https://github.com/aws-mwaa/upstream-to-airflow/blob/284cd529898fbadd14308004a0b0cb6f389b4318/airflow/providers/amazon/aws/waiters/README.md#L1). Though I'm actually happy to merge this PR with the waiter you used, and then move all of the EMR waiters to the new waiter system in another PR rather than scope creeping this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] BasPH commented on a diff in pull request #28827: Add option to wait for completion on the EmrCreateJobFlowOperator

Posted by GitBox <gi...@apache.org>.
BasPH commented on code in PR #28827:
URL: https://github.com/apache/airflow/pull/28827#discussion_r1066381947


##########
airflow/providers/amazon/aws/utils/waiter.py:
##########
@@ -60,14 +60,20 @@ def waiter(
             break
         if state in failure_states:
             raise AirflowException(f"{object_type.title()} reached failure state {state}.")
-        if countdown > check_interval_seconds:
-            countdown -= check_interval_seconds
+
+        if countdown is None:

Review Comment:
   Of course! Good one🙂
   
   Small suggestion though, I think defaulting the waiter `countdown` to infinity would be a better default. 25 minutes is such an arbitrary number, plus there's already `task.execution_timeout` and I don't see the need for yet another limit.
   
   So instead of requiring a user to configure `countdown=float("inf")`, I suggest leaving the default `None` and converting `None` to `float("inf")` just before the while loop for convenience. Alternatively, we could allow types `int | float` for `countdown` and default the value of `countdown` to `float("inf")`?
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org