You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/26 23:14:44 UTC

[GitHub] [airflow] syedahsn opened a new pull request, #25324: Add EMR Serverless Operators and Hooks

syedahsn opened a new pull request, #25324:
URL: https://github.com/apache/airflow/pull/25324

   Define new operators, as well as necessary hooks and sensors, to run EMR Serverless jobs. System test for EMR Serverless will be coming in a separate PR.
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25324:
URL: https://github.com/apache/airflow/pull/25324#issuecomment-1198463359

   @ferruzzi @o-nikolas @vincbeck -> WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
syedahsn commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r933478525


##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()
+
+    @staticmethod
+    def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
+        """
+        Get failure message from response dictionary.
+
+        :param response: response from AWS API
+        :return: failure message
+        :rtype: Optional[str]
+        """
+        return response['jobRun']['stateDetails']
+
+
+class EmrServerlessApplicationSensor(BaseSensorOperator):
+    """
+    Asks for the state of the application until it reaches a failure state or success state.
+    If the application fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessApplicationSensor`
+
+    :param application_id: application_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    template_fields: Sequence[str] = ('application_id',)
+
+    INTERMEDIATE_STATES = {'CREATING', 'STARTING', 'STOPPING'}
+    # TODO:  Question: Do these states indicate failure?

Review Comment:
   This was a question posed to one of the internal reviewers. I'll remove it from here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r938148882


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -90,6 +91,79 @@ def create_job_flow(self, job_flow_overrides: Dict[str, Any]) -> Dict[str, Any]:
         return response
 
 
+class EmrServerlessHook(AwsBaseHook):
+    """
+    Interact with EMR Serverless API.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        kwargs["client_type"] = "emr-serverless"
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def conn(self):
+        """Get the underlying boto3 EmrServerlessAPIService client (cached)"""
+        return super().conn
+
+    # This method should be replaced with boto waiters which would implement timeouts and backoff nicely.
+    def waiter(
+        self,
+        get_state_callable: Callable,
+        get_state_args: Dict,
+        parse_response: List,
+        desired_state: Set,
+        failure_states: Set,
+        object_type: str,
+        action: str,
+        countdown: int = 25 * 60,
+        check_interval_seconds: int = 60,
+    ) -> None:
+        """
+        Will run the sensor until it turns True.
+
+        :param get_state_callable: A callable to run until it returns True
+        :param get_state_args: Arguments to pass to get_state_callable
+        :param parse_response: Dictionary keys to extract state from response of get_state_callable
+        :param desired_state: Wait until the getter returns this value
+        :param failure_states: A set of states which indicate failure and should throw an
+            exception if any are reached before the desired_state
+        :param object_type: Used for the reporting string. What are you waiting for? (application, job, etc)
+        :param action: Used for the reporting string. What action are you waiting for? (created, deleted, etc)
+        :param countdown: Total amount of time the waiter should wait for the desired state
+            before timing out (in seconds). Defaults to 25 * 60 seconds.
+        :param check_interval_seconds: Number of seconds waiter should wait before attempting
+            to retry get_state_callable. Defaults to 60 seconds.
+        """
+        response = get_state_callable(**get_state_args)
+        state: str = self.get_state(response, parse_response)
+        while state not in desired_state:
+            if state in failure_states:
+                raise AirflowException(f'{object_type.title()} reached failure state {state}.')
+            if countdown >= check_interval_seconds:
+                countdown -= check_interval_seconds
+                print(f'Waiting for {object_type.lower()} to be {action.lower()}.')

Review Comment:
   FYI - A couple of `print()` statements here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r938169174


##########
airflow/providers/amazon/aws/example_dags/example_emr_serverless.py:
##########
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   new example dags should be according to AIP-47
   Example: https://github.com/apache/airflow/pull/25205



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r932535488


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -16,18 +16,24 @@
 # specific language governing permissions and limitations
 # under the License.
 import ast
+import sys
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
 from uuid import uuid4
 
-from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook
+from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook
 from airflow.providers.amazon.aws.links.emr import EmrClusterLink
+from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor, EmrServerlessJobSensor
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+if sys.version_info >= (3, 8):

Review Comment:
   I tihnk we are already fine with our util class (as in other providers) since we moved to 2.2+



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25324:
URL: https://github.com/apache/airflow/pull/25324#issuecomment-1202884029

   some errors need fixing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r933329710


##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES

Review Comment:
   ```suggestion
       :param target_states: a set of states to wait for, defaults to 'SUCCESS'
   ```
   Maybe it makes sense to use the actual value being checked rather than the variable in the docstring. I was thinking users reading the Python API docs won't have to look at code or other places in the docs to figure out what `SUCCESS_STATES` is. WDYT?



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()
+
+    @staticmethod
+    def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
+        """
+        Get failure message from response dictionary.
+
+        :param response: response from AWS API
+        :return: failure message
+        :rtype: Optional[str]
+        """
+        return response['jobRun']['stateDetails']
+
+
+class EmrServerlessApplicationSensor(BaseSensorOperator):
+    """
+    Asks for the state of the application until it reaches a failure state or success state.
+    If the application fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessApplicationSensor`
+
+    :param application_id: application_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    template_fields: Sequence[str] = ('application_id',)
+
+    INTERMEDIATE_STATES = {'CREATING', 'STARTING', 'STOPPING'}
+    # TODO:  Question: Do these states indicate failure?

Review Comment:
   @vincbeck Is this something you can answer?



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -90,6 +91,78 @@ def create_job_flow(self, job_flow_overrides: Dict[str, Any]) -> Dict[str, Any]:
         return response
 
 
+class EmrServerlessHook(AwsBaseHook):
+    """
+    Interact with EMR Serverless API.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        kwargs["client_type"] = "emr-serverless"
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def conn(self):
+        """Get the underlying boto3 EmrServerlessAPIService client (cached)"""
+        return super().conn
+
+    # This method should be replaced with boto waiters which would implement timeouts and backoff nicely.
+    def waiter(
+        self,
+        get_state_callable: Callable,
+        get_state_args: Dict,
+        parse_response: List,
+        desired_state: Set,
+        failure_states: Set,
+        object_type: str,
+        action: str,
+        countdown: int = 25 * 60,
+        check_interval_seconds: int = 60,
+    ) -> None:
+        """
+        Will run the sensor until it turns True.
+
+        :param get_state_callable: A callable to run until it returns True
+        :param get_state_args: Arguments to pass to get_state_callable
+        :param parse_response: Dictionary keys to extract state from response of get_state_callable
+        :param desired_state: Wait until the getter returns this value
+        :param failure_states: A set of states which indicate failure and should throw an
+        exception if any are reached before the desired_state
+        :param object_type: Used for the reporting string. What are you waiting for? (application, job, etc)
+        :param action: Used for the reporting string. What action are you waiting for? (created, deleted, etc)
+        :param countdown: Total amount of time the waiter should wait for the desired state
+        before timing out (in seconds). Defaults to 25 * 60 seconds.
+        :param check_interval_seconds: Number of seconds waiter should wait before attempting
+        to retry get_state_callable. Defaults to 60 seconds.
+        """
+        response = get_state_callable(**get_state_args)
+        state: str = self.get_state(response, parse_response)
+        while state not in desired_state:
+            if state in failure_states:
+                raise AirflowException(f'{object_type.title()} reached failure state {state}.')
+            if countdown >= check_interval_seconds:
+                countdown -= check_interval_seconds
+                print(f'Waiting for {object_type.lower()} to be {action.lower()}.')

Review Comment:
   Might be better to use `logging` instead of `print`? Again, just another typical practice.



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()

Review Comment:
   ```suggestion
           return EmrServerlessHook(aws_conn_id=self.aws_conn_id)
   ```
   So users can control the connection ID used to build the hook object. Unless this should be something like  use `aws_conn_id` if provided or `emr_conn_id` otherwise?



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -15,15 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Sequence
+import sys
+from typing import TYPE_CHECKING, Any, Dict, FrozenSet, Iterable, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook
+from airflow.sensors.base import BaseSensorOperator
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
-from airflow.compat.functools import cached_property
-from airflow.exceptions import AirflowException
-from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook
-from airflow.sensors.base import BaseSensorOperator
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property

Review Comment:
   ```suggestion
   from airflow.compat.functools import cached_property
   ```
   Same idea here as above. You can use the native compat util for `cached_property`



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -90,6 +91,78 @@ def create_job_flow(self, job_flow_overrides: Dict[str, Any]) -> Dict[str, Any]:
         return response
 
 
+class EmrServerlessHook(AwsBaseHook):
+    """
+    Interact with EMR Serverless API.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        kwargs["client_type"] = "emr-serverless"
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def conn(self):
+        """Get the underlying boto3 EmrServerlessAPIService client (cached)"""
+        return super().conn
+
+    # This method should be replaced with boto waiters which would implement timeouts and backoff nicely.
+    def waiter(
+        self,
+        get_state_callable: Callable,
+        get_state_args: Dict,
+        parse_response: List,
+        desired_state: Set,
+        failure_states: Set,
+        object_type: str,
+        action: str,
+        countdown: int = 25 * 60,
+        check_interval_seconds: int = 60,
+    ) -> None:
+        """
+        Will run the sensor until it turns True.
+
+        :param get_state_callable: A callable to run until it returns True
+        :param get_state_args: Arguments to pass to get_state_callable
+        :param parse_response: Dictionary keys to extract state from response of get_state_callable
+        :param desired_state: Wait until the getter returns this value
+        :param failure_states: A set of states which indicate failure and should throw an
+        exception if any are reached before the desired_state
+        :param object_type: Used for the reporting string. What are you waiting for? (application, job, etc)
+        :param action: Used for the reporting string. What action are you waiting for? (created, deleted, etc)
+        :param countdown: Total amount of time the waiter should wait for the desired state
+        before timing out (in seconds). Defaults to 25 * 60 seconds.
+        :param check_interval_seconds: Number of seconds waiter should wait before attempting
+        to retry get_state_callable. Defaults to 60 seconds.

Review Comment:
   ```suggestion
           :param failure_states: A set of states which indicate failure and should throw an
               exception if any are reached before the desired_state
           :param object_type: Used for the reporting string. What are you waiting for? (application, job, etc)
           :param action: Used for the reporting string. What action are you waiting for? (created, deleted, etc)
           :param countdown: Total amount of time the waiter should wait for the desired state
               before timing out (in seconds). Defaults to 25 * 60 seconds.
           :param check_interval_seconds: Number of seconds waiter should wait before attempting
               to retry get_state_callable. Defaults to 60 seconds.
   ```
   Would you mind add some indentation? The docstrings typically have a hanging indent in parameter descriptions on lines beyond the first for readability.



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()
+
+    @staticmethod
+    def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
+        """
+        Get failure message from response dictionary.
+
+        :param response: response from AWS API
+        :return: failure message
+        :rtype: Optional[str]
+        """
+        return response['jobRun']['stateDetails']
+
+
+class EmrServerlessApplicationSensor(BaseSensorOperator):
+    """
+    Asks for the state of the application until it reaches a failure state or success state.
+    If the application fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessApplicationSensor`
+
+    :param application_id: application_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    template_fields: Sequence[str] = ('application_id',)
+
+    INTERMEDIATE_STATES = {'CREATING', 'STARTING', 'STOPPING'}
+    # TODO:  Question: Do these states indicate failure?
+    FAILURE_STATES = {'STOPPED', 'TERMINATED'}
+    SUCCESS_STATES = {'CREATED', 'STARTED'}
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_application(applicationId=self.application_id)
+        except Exception:
+            raise AirflowException(f'Unable to get application state: {response}')
+
+        state = response['application']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()

Review Comment:
   ```suggestion
           return EmrServerlessHook(aws_conn_id=self.aws_conn_id)
   ```



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()
+
+    @staticmethod
+    def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
+        """
+        Get failure message from response dictionary.
+
+        :param response: response from AWS API
+        :return: failure message
+        :rtype: Optional[str]
+        """
+        return response['jobRun']['stateDetails']
+
+
+class EmrServerlessApplicationSensor(BaseSensorOperator):
+    """
+    Asks for the state of the application until it reaches a failure state or success state.
+    If the application fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessApplicationSensor`
+
+    :param application_id: application_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES

Review Comment:
   ```suggestion
       :param target_states: a set of states to wait for, defaults to {'STARTED', 'CREATED'}
   ```
   Same idea about having the values in the docstring. Up to you.



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -16,18 +16,24 @@
 # specific language governing permissions and limitations
 # under the License.
 import ast
+import sys
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
 from uuid import uuid4
 
-from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook
+from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook
 from airflow.providers.amazon.aws.links.emr import EmrClusterLink
+from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor, EmrServerlessJobSensor
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+if sys.version_info >= (3, 8):

Review Comment:
   +1 You can import `cached_property` from [`airflow.compat.functools`](https://github.com/apache/airflow/blob/main/airflow/compat/functools.py).



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id

Review Comment:
   I could easily be missing it, but where is `emr_conn_id` being used in this sensor and other sensors in this file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r938069019


##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -219,10 +216,7 @@ def __init__(
     def poke(self, context: 'Context') -> bool:
         state = None
 
-        try:

Review Comment:
   certainly makes to code cleaner.  :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
syedahsn commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r933477840


##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()
+
+    @staticmethod
+    def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
+        """
+        Get failure message from response dictionary.
+
+        :param response: response from AWS API
+        :return: failure message
+        :rtype: Optional[str]
+        """
+        return response['jobRun']['stateDetails']
+
+
+class EmrServerlessApplicationSensor(BaseSensorOperator):
+    """
+    Asks for the state of the application until it reaches a failure state or success state.
+    If the application fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessApplicationSensor`
+
+    :param application_id: application_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    template_fields: Sequence[str] = ('application_id',)
+
+    INTERMEDIATE_STATES = {'CREATING', 'STARTING', 'STOPPING'}
+    # TODO:  Question: Do these states indicate failure?

Review Comment:
   This was a question posed to one of the internal reviewers. This should be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
syedahsn commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r933471015


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -16,18 +16,24 @@
 # specific language governing permissions and limitations
 # under the License.
 import ast
+import sys
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
 from uuid import uuid4
 
-from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook
+from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook
 from airflow.providers.amazon.aws.links.emr import EmrClusterLink
+from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor, EmrServerlessJobSensor
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+if sys.version_info >= (3, 8):

Review Comment:
   Sounds good, I'll make the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r936769508


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -412,3 +414,254 @@ def execute(self, context: 'Context') -> None:
             raise AirflowException(f'JobFlow termination failed: {response}')
         else:
             self.log.info('JobFlow with id %s terminated', self.job_flow_id)
+
+
+class EmrServerlessCreateApplicationOperator(BaseOperator):
+    """
+    Operator to create Serverless EMR Application
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EmrServerlessCreateApplicationOperator`
+
+    :param release_label: The EMR release version associated with the application.
+    :param job_type: The type of application you want to start, such as Spark or Hive.
+    :param wait_for_completion: If true, wait for the Application to start before returning. Default to True
+    :param client_request_token: The client idempotency token of the application to create.
+      Its value must be unique for each request.
+    :param config: Optional dictionary for arbitrary parameters to the boto API create_application call.
+    :param aws_conn_id: AWS connection to use
+    """
+
+    def __init__(
+        self,
+        release_label: str,
+        job_type: str,
+        client_request_token: str = '',
+        config: Optional[dict] = None,
+        wait_for_completion: bool = True,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ):
+        self.aws_conn_id = aws_conn_id
+        self.release_label = release_label
+        self.job_type = job_type
+        self.wait_for_completion = wait_for_completion
+        self.kwargs = kwargs
+        self.config = config or {}
+        super().__init__(**kwargs)
+
+        self.client_request_token = client_request_token or str(uuid4())
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook."""
+        return EmrServerlessHook(aws_conn_id=self.aws_conn_id)
+
+    def execute(self, context: 'Context'):
+        response = self.hook.conn.create_application(
+            clientToken=self.client_request_token,
+            releaseLabel=self.release_label,
+            type=self.job_type,
+            **self.config,
+        )
+        application_id = response['applicationId']
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Application Creation failed: {response}')
+
+        self.log.info('EMR serverless application created: %s', application_id)
+
+        # This should be replaced with a boto waiter when available.
+        self.hook.waiter(
+            get_state_callable=self.hook.conn.get_application,
+            get_state_args={'applicationId': application_id},
+            parse_response=['application', 'state'],
+            desired_state={'CREATED'},
+            failure_states=EmrServerlessApplicationSensor.FAILURE_STATES,
+            object_type='application',
+            action='created',
+        )
+
+        self.log.info('Starting application %s', application_id)
+        self.hook.conn.start_application(applicationId=application_id)
+
+        if self.wait_for_completion:
+            # This should be replaced with a boto waiter when available.
+            self.hook.waiter(
+                get_state_callable=self.hook.conn.get_application,
+                get_state_args={'applicationId': application_id},
+                parse_response=['application', 'state'],
+                desired_state={'STARTED'},
+                failure_states=EmrServerlessApplicationSensor.FAILURE_STATES,
+                object_type='application',
+                action='started',
+            )
+
+        return application_id
+
+
+class EmrServerlessStartJobOperator(BaseOperator):
+    """
+    Operator to start EMR Serverless job.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EmrServerlessStartJobOperator`
+
+    :param application_id: ID of the EMR Serverless application to start.
+    :param execution_role_arn: ARN of role to perform action.
+    :param job_driver: Driver that the job runs on.
+    :param configuration_overrides: Configuration specifications to override existing configurations.
+    :param client_request_token: The client idempotency token of the application to create.
+      Its value must be unique for each request.
+    :param config: Optional dictionary for arbitrary parameters to the boto API start_job_run call.
+    :param wait_for_completion: If true, waits for the job to start before returning. Defaults to True.
+    :param aws_conn_id: AWS connection to use
+    """
+
+    template_fields: Sequence[str] = ('application_id',)
+
+    def __init__(
+        self,
+        application_id: str,
+        execution_role_arn: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict],
+        client_request_token: str = '',
+        config: Optional[dict] = None,
+        wait_for_completion: bool = True,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ):
+        self.aws_conn_id = aws_conn_id
+        self.application_id = application_id
+        self.execution_role_arn = execution_role_arn
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides
+        self.wait_for_completion = wait_for_completion
+        self.config = config or {}
+        super().__init__(**kwargs)
+
+        self.client_request_token = client_request_token or str(uuid4())
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook."""
+        return EmrServerlessHook(aws_conn_id=self.aws_conn_id)
+
+    def execute(self, context: 'Context') -> Dict:
+        self.log.info('Starting job on Application: %s', self.application_id)
+
+        app_state = self.hook.conn.get_application(applicationId=self.application_id)['application']['state']
+        if app_state not in {'CREATED', 'STARTED'}:

Review Comment:
   ```suggestion
           if app_state not in EmrServerlessApplicationSensor.SUCCESS_STATES:
   ```
   Small suggestion to use the attrs where you can like you have in other places. Up to you though (assuming it's functionally correct as well).



##########
docs/apache-airflow-providers-amazon/operators/emr_serverless.rst:
##########
@@ -0,0 +1,112 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+===============================
+Amazon EMR Serverless Operators
+===============================
+
+`Amazon EMR Serverless <https://aws.amazon.com/emr/serverless/>`__ is a serverless option
+in Amazon EMR that makes it easy for data analysts and engineers to run open-source big
+data analytics frameworks without configuring, managing, and scaling clusters or servers.
+You get all the features and benefits of Amazon EMR without the need for experts to plan
+and manage clusters.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+Operators
+---------
+.. _howto/operator:EmrServerlessCreateApplicationOperator:
+
+Create an EMR Serverless Application
+==========================

Review Comment:
   This underline needs to be at least the length of the text it's underlining. I imagine this might be a Build Docs failure for you.



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:

Review Comment:
   FWIW I think `AirflowException` is used too broadly in the code base. IMO `AirflowException`, and its subclasses, are really related to Airflow orchestration activity rather than activity done by what Airflow is _orchestrating_. It might make more sense to drop the `try/except` and bubble up whatever API call errors that might be thrown directly. Up to you though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #25324:
URL: https://github.com/apache/airflow/pull/25324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #25324:
URL: https://github.com/apache/airflow/pull/25324#issuecomment-1196073437

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r938218402


##########
airflow/providers/amazon/aws/example_dags/example_emr_serverless.py:
##########
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   That's a good point but the plan was to do it in two iterations since this PR is already big enough. Once this PR is merged, @syedahsn will work on converting this sample dag to system test. If you strongly disagree, I guess the two can be done in this PR but it will just make it bigger and harder to review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25324:
URL: https://github.com/apache/airflow/pull/25324#issuecomment-1198468619

   > Syed is a new member of our team, we already reviewed his work :) Thanks for reaching out though
   
   aha :) Good to know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] syedahsn commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
syedahsn commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r933693886


##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:

Review Comment:
   I went with `Exception` just as a catchall for any type of exception that might be thrown by the boto3 API. I'm not sure if there's an advantage to throwing an `AirflowException` instead of a generic `Exception`. Only thing I can think of is that it would make it clear that the exception was coming from Airflow, rather than any underlying API calls (i.e. boto3) that might have been made. But that can also be understood from the message associated with the `Exception`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25324:
URL: https://github.com/apache/airflow/pull/25324#issuecomment-1199091729

   Some errors to fix :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r933507751


##########
docs/apache-airflow-providers-amazon/operators/emr_serverless.rst:
##########
@@ -0,0 +1,111 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+===============================
+Amazon EMR Serverless Operators
+===============================
+
+`Amazon EMR Serverless <https://aws.amazon.com/emr/serverless/>`__ is a serverless option
+in Amazon EMR that makes it easy for data analysts and engineers to run open-source big
+data analytics frameworks without configuring, managing, and scaling clusters or servers.
+You get all the features and benefits of Amazon EMR without the need for experts to plan
+and manage clusters.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+

Review Comment:
   ```suggestion
   Operators
   ---------
   ```



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:

Review Comment:
   Overall question. Do we want or is there any value converting the `Exception` to `AirflowException`? I can see there is a lot of operators which do not do that. If there is no value, I would rather not doing the conversion just for the sake of having a code cleaner



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -90,6 +91,78 @@ def create_job_flow(self, job_flow_overrides: Dict[str, Any]) -> Dict[str, Any]:
         return response
 
 
+class EmrServerlessHook(AwsBaseHook):
+    """
+    Interact with EMR Serverless API.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        kwargs["client_type"] = "emr-serverless"
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def conn(self):
+        """Get the underlying boto3 EmrServerlessAPIService client (cached)"""
+        return super().conn
+
+    # This method should be replaced with boto waiters which would implement timeouts and backoff nicely.
+    def waiter(
+        self,
+        get_state_callable: Callable,
+        get_state_args: Dict,
+        parse_response: List,
+        desired_state: Set,
+        failure_states: Set,
+        object_type: str,
+        action: str,
+        countdown: int = 25 * 60,
+        check_interval_seconds: int = 60,
+    ) -> None:
+        """
+        Will run the sensor until it turns True.
+
+        :param get_state_callable: A callable to run until it returns True
+        :param get_state_args: Arguments to pass to get_state_callable
+        :param parse_response: Dictionary keys to extract state from response of get_state_callable
+        :param desired_state: Wait until the getter returns this value
+        :param failure_states: A set of states which indicate failure and should throw an
+        exception if any are reached before the desired_state
+        :param object_type: Used for the reporting string. What are you waiting for? (application, job, etc)
+        :param action: Used for the reporting string. What action are you waiting for? (created, deleted, etc)
+        :param countdown: Total amount of time the waiter should wait for the desired state
+        before timing out (in seconds). Defaults to 25 * 60 seconds.
+        :param check_interval_seconds: Number of seconds waiter should wait before attempting
+        to retry get_state_callable. Defaults to 60 seconds.
+        """
+        response = get_state_callable(**get_state_args)
+        state: str = self.get_state(response, parse_response)
+        while state not in desired_state:
+            if state in failure_states:
+                raise AirflowException(f'{object_type.title()} reached failure state {state}.')
+            if countdown >= check_interval_seconds:
+                countdown -= check_interval_seconds
+                print(f'Waiting for {object_type.lower()} to be {action.lower()}.')
+                sleep(check_interval_seconds)
+                state = self.get_state(get_state_callable(**get_state_args), parse_response)
+            else:
+                message = f'{object_type.title()} still not {action.lower()} after the allocated time limit.'
+                print(message)

Review Comment:
   Same here I guess :)



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None

Review Comment:
   I dont think this line is needed



##########
docs/apache-airflow-providers-amazon/operators/emr_serverless.rst:
##########
@@ -0,0 +1,111 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+===============================
+Amazon EMR Serverless Operators
+===============================
+
+`Amazon EMR Serverless <https://aws.amazon.com/emr/serverless/>`__ is a serverless option
+in Amazon EMR that makes it easy for data analysts and engineers to run open-source big
+data analytics frameworks without configuring, managing, and scaling clusters or servers.
+You get all the features and benefits of Amazon EMR without the need for experts to plan
+and manage clusters.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+.. _howto/operator:EmrServerlessCreateApplicationOperator:
+
+Create an EMR Serverless Application
+------------------------------------

Review Comment:
   ```suggestion
   ==========================
   ```



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()

Review Comment:
   Agree, we should pass `aws_conn_id`



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id

Review Comment:
   I agree. It seems `emr_conn_id` is not used. You can just remove it



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES

Review Comment:
   Agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
vincbeck commented on PR #25324:
URL: https://github.com/apache/airflow/pull/25324#issuecomment-1198466526

   Syed is a new member of our team, we already reviewed his work :) Thanks for reaching out though


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dacort commented on pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
dacort commented on PR #25324:
URL: https://github.com/apache/airflow/pull/25324#issuecomment-1199844329

   Closes https://github.com/apache/airflow/issues/20215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r938163972


##########
docs/apache-airflow-providers-amazon/operators/emr_serverless.rst:
##########
@@ -0,0 +1,112 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+===============================
+Amazon EMR Serverless Operators
+===============================
+
+`Amazon EMR Serverless <https://aws.amazon.com/emr/serverless/>`__ is a serverless option
+in Amazon EMR that makes it easy for data analysts and engineers to run open-source big
+data analytics frameworks without configuring, managing, and scaling clusters or servers.
+You get all the features and benefits of Amazon EMR without the need for experts to plan
+and manage clusters.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+Operators
+---------

Review Comment:
   ```suggestion
   
   Operators
   ---------
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org