You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "syedahsn (via GitHub)" <gi...@apache.org> on 2023/07/11 08:44:08 UTC

[GitHub] [airflow] syedahsn opened a new pull request, #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

syedahsn opened a new pull request, #32513:
URL: https://github.com/apache/airflow/pull/32513

   This PR adds deferrable mode to `EmrServerlessCreateApplicationOperator`, `EmrServerlessStopApplicationOperator`, and `EmrServerlessDeleteApplicationOperator`. All the associated tests are included.
   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1261802531


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1025,11 +1034,12 @@ def __init__(
         self.aws_conn_id = aws_conn_id
         self.release_label = release_label
         self.job_type = job_type
-        self.wait_for_completion = wait_for_completion
+        self.wait_for_completion = False if deferrable else wait_for_completion

Review Comment:
   > I was actually wondering with the new default if it'd make sense to defer only if deferrable && wait_for_completion ?
   
   Why would that be the case? In this case `deferrable` and `wait_for_completion` don't really interact (i.e. if we defer, then we never check for `wait_for_completion`) But I think `deferrable=True` implies `wait_for_completion=True`.
   
   A lot of the time, `wait_for_completion` is set as `False` by default, which would be confusing for users if they pass `deferrable=True` but we don't defer because `wait_for_completion` is `False`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vandonr-amz commented on pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "vandonr-amz (via GitHub)" <gi...@apache.org>.
vandonr-amz commented on PR #32513:
URL: https://github.com/apache/airflow/pull/32513#issuecomment-1631630969

   I left some comments on the previous PR that I don't think you addressed ? Maybe you can reply where I left them ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1270291295


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1079,6 +1100,35 @@ def execute(self, context: Context) -> str | None:
             )
         return application_id
 
+    def start_application_deferred(self, context: Context, event: dict[str, Any] | None = None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] != "success":
+            raise AirflowException(f"Application {event['application_id']} failed to create")
+        self.log.info("Starting application %s", event["application_id"])
+        self.hook.conn.start_application(applicationId=event["application_id"])
+        self.defer(
+            trigger=EmrServerlessStartApplicationTrigger(
+                application_id=event["application_id"],
+                aws_conn_id=self.aws_conn_id,
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+            ),
+            timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] != "success":
+            raise AirflowException(f"Application {event['application_id']} failed to start")
+        else:
+            self.log.info("Application %s started", event["application_id"])
+            return event["application_id"]

Review Comment:
   Thanks, I slightly changed the if statements to be a bit more concise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1261910220


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1025,11 +1034,12 @@ def __init__(
         self.aws_conn_id = aws_conn_id
         self.release_label = release_label
         self.job_type = job_type
-        self.wait_for_completion = wait_for_completion
+        self.wait_for_completion = False if deferrable else wait_for_completion

Review Comment:
   You have a good point, but a counter point is that it would be confusing for users to set `deferrable=True` for an operator, and not see any difference in behaviour. 
   
   > then suddenly your operators start waiting when they weren't before
   
   I think the solution to this is to be very clear that `deferrable=True` implies `wait_for_completion=True`.
   Either way,I think this is a decision that we need to clear up, and then implement everywhere. A quick search showed that we have an inconsistent definition of how `deferrable` and `wait_for_completion` work with each other. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Lee-W commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1265491042


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -256,9 +256,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
         kwargs["client_type"] = "emr-serverless"
         super().__init__(*args, **kwargs)
 
-    def cancel_running_jobs(self, application_id: str, waiter_config: dict = {}):
+    def cancel_running_jobs(
+        self, application_id: str, waiter_config: dict = {}, wait_for_completion: bool = True

Review Comment:
   It's suggested not to use mutable object `{}` as default value.



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_created",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application creation failed",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStartApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be started.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1355,6 +1438,24 @@ def execute(self, context: Context) -> None:
             )
             self.log.info("EMR serverless application %s stopped successfully", self.application_id)
 
+    def stop_application(self, context, event=None) -> None:

Review Comment:
   nitpick: missing type annotation



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1355,6 +1438,24 @@ def execute(self, context: Context) -> None:
             )
             self.log.info("EMR serverless application %s stopped successfully", self.application_id)
 
+    def stop_application(self, context, event=None) -> None:
+        if event["status"] == "success":
+            self.hook.conn.stop_application(applicationId=self.application_id)
+            self.defer(
+                trigger=EmrServerlessStopApplicationTrigger(
+                    application_id=self.application_id,
+                    aws_conn_id=self.aws_conn_id,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                ),
+                timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context, event=None) -> None:
+        if event["status"] == "success":
+            self.log.info("EMR serverless application %s stopped successfully", self.application_id)

Review Comment:
   Do we need to handle the non-success cases as other operators?



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_created",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application creation failed",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStartApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be started.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_started",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStopApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be stopped.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_created",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application creation failed",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStartApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be started.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_started",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStopApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be stopped.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_stopped",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessDeleteApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be deleted.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1355,6 +1438,24 @@ def execute(self, context: Context) -> None:
             )
             self.log.info("EMR serverless application %s stopped successfully", self.application_id)
 
+    def stop_application(self, context, event=None) -> None:
+        if event["status"] == "success":
+            self.hook.conn.stop_application(applicationId=self.application_id)
+            self.defer(
+                trigger=EmrServerlessStopApplicationTrigger(
+                    application_id=self.application_id,
+                    aws_conn_id=self.aws_conn_id,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                ),
+                timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context, event=None) -> None:

Review Comment:
   nitpick: missing type annotation



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_created",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application creation failed",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStartApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be started.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_started",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStopApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be stopped.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_stopped",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessDeleteApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be deleted.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_terminated",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessCancelJobsTrigger(AwsBaseWaiterTrigger):
+    """
+    Trigger for canceling a list of jobs in an EMR Serverless application.
+
+    :param application_id: EMR Serverless application ID
+    :param aws_conn_id: Reference to AWS connection id
+    :param waiter_delay: Delay in seconds between each attempt to check the status
+    :param waiter_max_attempts: Maximum number of attempts to check the status
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        aws_conn_id: str,
+        waiter_delay: int,
+        waiter_max_attempts: int,
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -256,9 +256,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
         kwargs["client_type"] = "emr-serverless"
         super().__init__(*args, **kwargs)
 
-    def cancel_running_jobs(self, application_id: str, waiter_config: dict = {}):
+    def cancel_running_jobs(
+        self, application_id: str, waiter_config: dict = {}, wait_for_completion: bool = True
+    ):

Review Comment:
   nitpick: missing return annotation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1270293579


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1332,16 +1391,46 @@ def execute(self, context: Context) -> None:
         self.log.info("Stopping application: %s", self.application_id)
 
         if self.force_stop:
-            self.hook.cancel_running_jobs(
-                self.application_id,
-                waiter_config={
-                    "Delay": self.waiter_delay,
-                    "MaxAttempts": self.waiter_max_attempts,
-                },
+            count = self.hook.cancel_running_jobs(
+                application_id=self.application_id,
+                wait_for_completion=False,
             )
+            if count > 0:
+                self.log.info("now waiting for the %s cancelled job(s) to terminate", count)
+                if self.deferrable:
+                    self.defer(
+                        trigger=EmrServerlessCancelJobsTrigger(
+                            application_id=self.application_id,
+                            aws_conn_id=self.aws_conn_id,
+                            waiter_delay=self.waiter_delay,
+                            waiter_max_attempts=self.waiter_max_attempts,
+                        ),
+                        timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay),
+                        method_name="stop_application",
+                    )
+                self.hook.get_waiter("no_job_running").wait(

Review Comment:
   I'm not sure what you mean. If `self.deferrable=True`, execution will jump to `stop_application` after the Trigger is done, so it will not get to `self.hook.get_waiter("no_job_running").wait()` again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on PR #32513:
URL: https://github.com/apache/airflow/pull/32513#issuecomment-1631337656

   @uranusjr @pragnareddye @vandonr-amz  Can you please have another look at this PR? This is the same PR as #32414  - I ended up closing that because there was a problem with rebasing which messed up the commits :sweat_smile: This PR is the same one - I've addressed all comments. Please let me know if I've missed anything


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas merged pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas merged PR #32513:
URL: https://github.com/apache/airflow/pull/32513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1268271272


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1025,11 +1034,12 @@ def __init__(
         self.aws_conn_id = aws_conn_id
         self.release_label = release_label
         self.job_type = job_type
-        self.wait_for_completion = wait_for_completion
+        self.wait_for_completion = False if deferrable else wait_for_completion

Review Comment:
   This question has been posted on the dev list, and we can continue it there. For now, I will resolve this conversation so that this PR does not get held up. Whatever decision we make, we will have to apply it to other operators anyways 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Lee-W commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1269054684


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1079,6 +1100,35 @@ def execute(self, context: Context) -> str | None:
             )
         return application_id
 
+    def start_application_deferred(self, context: Context, event: dict[str, Any] | None = None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] != "success":
+            raise AirflowException(f"Application {event['application_id']} failed to create")
+        self.log.info("Starting application %s", event["application_id"])
+        self.hook.conn.start_application(applicationId=event["application_id"])
+        self.defer(
+            trigger=EmrServerlessStartApplicationTrigger(
+                application_id=event["application_id"],
+                aws_conn_id=self.aws_conn_id,
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+            ),
+            timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] != "success":
+            raise AirflowException(f"Application {event['application_id']} failed to start")
+        else:
+            self.log.info("Application %s started", event["application_id"])
+            return event["application_id"]

Review Comment:
   ```suggestion
   
           self.log.info("Application %s started", event["application_id"])
           return event["application_id"]
   ```



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1332,16 +1391,46 @@ def execute(self, context: Context) -> None:
         self.log.info("Stopping application: %s", self.application_id)
 
         if self.force_stop:
-            self.hook.cancel_running_jobs(
-                self.application_id,
-                waiter_config={
-                    "Delay": self.waiter_delay,
-                    "MaxAttempts": self.waiter_max_attempts,
-                },
+            count = self.hook.cancel_running_jobs(
+                application_id=self.application_id,
+                wait_for_completion=False,
             )
+            if count > 0:
+                self.log.info("now waiting for the %s cancelled job(s) to terminate", count)
+                if self.deferrable:
+                    self.defer(
+                        trigger=EmrServerlessCancelJobsTrigger(
+                            application_id=self.application_id,
+                            aws_conn_id=self.aws_conn_id,
+                            waiter_delay=self.waiter_delay,
+                            waiter_max_attempts=self.waiter_max_attempts,
+                        ),
+                        timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay),
+                        method_name="stop_application",
+                    )
+                self.hook.get_waiter("no_job_running").wait(

Review Comment:
   Do we need to add an else clause here? Looks like `EmrServerlessCancelJobsTrigger` is waiting for it as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vandonr-amz commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "vandonr-amz (via GitHub)" <gi...@apache.org>.
vandonr-amz commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1261805082


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1025,11 +1034,12 @@ def __init__(
         self.aws_conn_id = aws_conn_id
         self.release_label = release_label
         self.job_type = job_type
-        self.wait_for_completion = wait_for_completion
+        self.wait_for_completion = False if deferrable else wait_for_completion

Review Comment:
   what I'm saying is that if you have an operator where `wait_for_completion = False` and you set your config to `operators.default_deferrable = True` because yeah sure you want to free workers, then suddenly your operators start waiting when they weren't before and that can be a bit confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1261792096


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -284,13 +289,16 @@ def cancel_running_jobs(self, application_id: str, waiter_config: dict = {}):
                 )
                 for job_id in job_ids:
                     self.conn.cancel_job_run(applicationId=application_id, jobRunId=job_id)
-        if count > 0:
-            self.log.info("now waiting for the %s cancelled job(s) to terminate", count)
-            self.get_waiter("no_job_running").wait(
-                applicationId=application_id,
-                states=list(self.JOB_INTERMEDIATE_STATES.union({"CANCELLING"})),
-                WaiterConfig=waiter_config,
-            )
+        if wait_for_completion:
+            if count > 0:
+                self.log.info("now waiting for the %s cancelled job(s) to terminate", count)
+                self.get_waiter("no_job_running").wait(
+                    applicationId=application_id,
+                    states=list(self.JOB_INTERMEDIATE_STATES.union({"CANCELLING"})),
+                    WaiterConfig=waiter_config,
+                )
+
+        return count
 

Review Comment:
   @vandonr-amz  You were right about this being a breaking change. I solved the issue by adding a `wait_for_completion` parameter. I think its a cleaner solution anyways



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vandonr-amz commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "vandonr-amz (via GitHub)" <gi...@apache.org>.
vandonr-amz commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1261805654


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1025,11 +1034,12 @@ def __init__(
         self.aws_conn_id = aws_conn_id
         self.release_label = release_label
         self.job_type = job_type
-        self.wait_for_completion = wait_for_completion
+        self.wait_for_completion = False if deferrable else wait_for_completion

Review Comment:
   the "good" situation I'd expect as a user by setting `operators.default_deferrable = True` is that my operators waiting for completion would start deferring, and those not waiting would continue to not wait



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1261910220


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1025,11 +1034,12 @@ def __init__(
         self.aws_conn_id = aws_conn_id
         self.release_label = release_label
         self.job_type = job_type
-        self.wait_for_completion = wait_for_completion
+        self.wait_for_completion = False if deferrable else wait_for_completion

Review Comment:
   You have a good point, but a counter point is that it would be confusing for users to set `deferrable=True` for an operator, and not see any difference in behaviour. 
   
   > then suddenly your operators start waiting when they weren't before
   
   I think the solution to this is to be very clear that `deferrable=True` implies `wait_for_completion=True`.
   I think this is a decision that we need to clear up, and then implement everywhere. A quick search showed that we have an inconsistent definition of how `deferrable` and `wait_for_completion` work with each other. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #32513: EMR serverless Create/Start/Stop/Delete Application deferrable mode

Posted by "syedahsn (via GitHub)" <gi...@apache.org>.
syedahsn commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1265910807


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1355,6 +1438,24 @@ def execute(self, context: Context) -> None:
             )
             self.log.info("EMR serverless application %s stopped successfully", self.application_id)
 
+    def stop_application(self, context, event=None) -> None:
+        if event["status"] == "success":
+            self.hook.conn.stop_application(applicationId=self.application_id)
+            self.defer(
+                trigger=EmrServerlessStopApplicationTrigger(
+                    application_id=self.application_id,
+                    aws_conn_id=self.aws_conn_id,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                ),
+                timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context, event=None) -> None:
+        if event["status"] == "success":
+            self.log.info("EMR serverless application %s stopped successfully", self.application_id)

Review Comment:
   no we don't need to because the Trigger is the only place that generates the `event` object, and in this case, the Trigger only sets the status to success because if there was a failure, it would have raised an exception before creating the event object, and this method would never have been executed. Technically, we don't even need to check the status as it is because we know if the execution gets this far, the operation was a success, but I think its good practice to check anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org