You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/29 14:46:37 UTC

[GitHub] [airflow] josh-fell commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

josh-fell commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r933329710


##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES

Review Comment:
   ```suggestion
       :param target_states: a set of states to wait for, defaults to 'SUCCESS'
   ```
   Maybe it makes sense to use the actual value being checked rather than the variable in the docstring. I was thinking users reading the Python API docs won't have to look at code or other places in the docs to figure out what `SUCCESS_STATES` is. WDYT?



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()
+
+    @staticmethod
+    def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
+        """
+        Get failure message from response dictionary.
+
+        :param response: response from AWS API
+        :return: failure message
+        :rtype: Optional[str]
+        """
+        return response['jobRun']['stateDetails']
+
+
+class EmrServerlessApplicationSensor(BaseSensorOperator):
+    """
+    Asks for the state of the application until it reaches a failure state or success state.
+    If the application fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessApplicationSensor`
+
+    :param application_id: application_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    template_fields: Sequence[str] = ('application_id',)
+
+    INTERMEDIATE_STATES = {'CREATING', 'STARTING', 'STOPPING'}
+    # TODO:  Question: Do these states indicate failure?

Review Comment:
   @vincbeck Is this something you can answer?



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -90,6 +91,78 @@ def create_job_flow(self, job_flow_overrides: Dict[str, Any]) -> Dict[str, Any]:
         return response
 
 
+class EmrServerlessHook(AwsBaseHook):
+    """
+    Interact with EMR Serverless API.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        kwargs["client_type"] = "emr-serverless"
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def conn(self):
+        """Get the underlying boto3 EmrServerlessAPIService client (cached)"""
+        return super().conn
+
+    # This method should be replaced with boto waiters which would implement timeouts and backoff nicely.
+    def waiter(
+        self,
+        get_state_callable: Callable,
+        get_state_args: Dict,
+        parse_response: List,
+        desired_state: Set,
+        failure_states: Set,
+        object_type: str,
+        action: str,
+        countdown: int = 25 * 60,
+        check_interval_seconds: int = 60,
+    ) -> None:
+        """
+        Will run the sensor until it turns True.
+
+        :param get_state_callable: A callable to run until it returns True
+        :param get_state_args: Arguments to pass to get_state_callable
+        :param parse_response: Dictionary keys to extract state from response of get_state_callable
+        :param desired_state: Wait until the getter returns this value
+        :param failure_states: A set of states which indicate failure and should throw an
+        exception if any are reached before the desired_state
+        :param object_type: Used for the reporting string. What are you waiting for? (application, job, etc)
+        :param action: Used for the reporting string. What action are you waiting for? (created, deleted, etc)
+        :param countdown: Total amount of time the waiter should wait for the desired state
+        before timing out (in seconds). Defaults to 25 * 60 seconds.
+        :param check_interval_seconds: Number of seconds waiter should wait before attempting
+        to retry get_state_callable. Defaults to 60 seconds.
+        """
+        response = get_state_callable(**get_state_args)
+        state: str = self.get_state(response, parse_response)
+        while state not in desired_state:
+            if state in failure_states:
+                raise AirflowException(f'{object_type.title()} reached failure state {state}.')
+            if countdown >= check_interval_seconds:
+                countdown -= check_interval_seconds
+                print(f'Waiting for {object_type.lower()} to be {action.lower()}.')

Review Comment:
   Might be better to use `logging` instead of `print`? Again, just another typical practice.



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()

Review Comment:
   ```suggestion
           return EmrServerlessHook(aws_conn_id=self.aws_conn_id)
   ```
   So users can control the connection ID used to build the hook object. Unless this should be something like  use `aws_conn_id` if provided or `emr_conn_id` otherwise?



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -15,15 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Sequence
+import sys
+from typing import TYPE_CHECKING, Any, Dict, FrozenSet, Iterable, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook
+from airflow.sensors.base import BaseSensorOperator
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
-from airflow.compat.functools import cached_property
-from airflow.exceptions import AirflowException
-from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook
-from airflow.sensors.base import BaseSensorOperator
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property

Review Comment:
   ```suggestion
   from airflow.compat.functools import cached_property
   ```
   Same idea here as above. You can use the native compat util for `cached_property`



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -90,6 +91,78 @@ def create_job_flow(self, job_flow_overrides: Dict[str, Any]) -> Dict[str, Any]:
         return response
 
 
+class EmrServerlessHook(AwsBaseHook):
+    """
+    Interact with EMR Serverless API.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        kwargs["client_type"] = "emr-serverless"
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def conn(self):
+        """Get the underlying boto3 EmrServerlessAPIService client (cached)"""
+        return super().conn
+
+    # This method should be replaced with boto waiters which would implement timeouts and backoff nicely.
+    def waiter(
+        self,
+        get_state_callable: Callable,
+        get_state_args: Dict,
+        parse_response: List,
+        desired_state: Set,
+        failure_states: Set,
+        object_type: str,
+        action: str,
+        countdown: int = 25 * 60,
+        check_interval_seconds: int = 60,
+    ) -> None:
+        """
+        Will run the sensor until it turns True.
+
+        :param get_state_callable: A callable to run until it returns True
+        :param get_state_args: Arguments to pass to get_state_callable
+        :param parse_response: Dictionary keys to extract state from response of get_state_callable
+        :param desired_state: Wait until the getter returns this value
+        :param failure_states: A set of states which indicate failure and should throw an
+        exception if any are reached before the desired_state
+        :param object_type: Used for the reporting string. What are you waiting for? (application, job, etc)
+        :param action: Used for the reporting string. What action are you waiting for? (created, deleted, etc)
+        :param countdown: Total amount of time the waiter should wait for the desired state
+        before timing out (in seconds). Defaults to 25 * 60 seconds.
+        :param check_interval_seconds: Number of seconds waiter should wait before attempting
+        to retry get_state_callable. Defaults to 60 seconds.

Review Comment:
   ```suggestion
           :param failure_states: A set of states which indicate failure and should throw an
               exception if any are reached before the desired_state
           :param object_type: Used for the reporting string. What are you waiting for? (application, job, etc)
           :param action: Used for the reporting string. What action are you waiting for? (created, deleted, etc)
           :param countdown: Total amount of time the waiter should wait for the desired state
               before timing out (in seconds). Defaults to 25 * 60 seconds.
           :param check_interval_seconds: Number of seconds waiter should wait before attempting
               to retry get_state_callable. Defaults to 60 seconds.
   ```
   Would you mind add some indentation? The docstrings typically have a hanging indent in parameter descriptions on lines beyond the first for readability.



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()
+
+    @staticmethod
+    def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
+        """
+        Get failure message from response dictionary.
+
+        :param response: response from AWS API
+        :return: failure message
+        :rtype: Optional[str]
+        """
+        return response['jobRun']['stateDetails']
+
+
+class EmrServerlessApplicationSensor(BaseSensorOperator):
+    """
+    Asks for the state of the application until it reaches a failure state or success state.
+    If the application fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessApplicationSensor`
+
+    :param application_id: application_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    template_fields: Sequence[str] = ('application_id',)
+
+    INTERMEDIATE_STATES = {'CREATING', 'STARTING', 'STOPPING'}
+    # TODO:  Question: Do these states indicate failure?
+    FAILURE_STATES = {'STOPPED', 'TERMINATED'}
+    SUCCESS_STATES = {'CREATED', 'STARTED'}
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_application(applicationId=self.application_id)
+        except Exception:
+            raise AirflowException(f'Unable to get application state: {response}')
+
+        state = response['application']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()

Review Comment:
   ```suggestion
           return EmrServerlessHook(aws_conn_id=self.aws_conn_id)
   ```



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:
+            raise AirflowException(f'Unable to get job state: {response}')
+
+        state = response['jobRun']['state']
+
+        if state in self.FAILURE_STATES:
+            failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
+            raise AirflowException(failure_message)
+
+        return state in self.target_states
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook"""
+        return EmrServerlessHook()
+
+    @staticmethod
+    def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
+        """
+        Get failure message from response dictionary.
+
+        :param response: response from AWS API
+        :return: failure message
+        :rtype: Optional[str]
+        """
+        return response['jobRun']['stateDetails']
+
+
+class EmrServerlessApplicationSensor(BaseSensorOperator):
+    """
+    Asks for the state of the application until it reaches a failure state or success state.
+    If the application fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessApplicationSensor`
+
+    :param application_id: application_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES

Review Comment:
   ```suggestion
       :param target_states: a set of states to wait for, defaults to {'STARTED', 'CREATED'}
   ```
   Same idea about having the values in the docstring. Up to you.



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -16,18 +16,24 @@
 # specific language governing permissions and limitations
 # under the License.
 import ast
+import sys
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
 from uuid import uuid4
 
-from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook
+from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook
 from airflow.providers.amazon.aws.links.emr import EmrClusterLink
+from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor, EmrServerlessJobSensor
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+if sys.version_info >= (3, 8):

Review Comment:
   +1 You can import `cached_property` from [`airflow.compat.functools`](https://github.com/apache/airflow/blob/main/airflow/compat/functools.py).



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id

Review Comment:
   I could easily be missing it, but where is `emr_conn_id` being used in this sensor and other sensors in this file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org