You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/03 15:07:46 UTC

[GitHub] [airflow] josh-fell commented on a diff in pull request #25324: Add EMR Serverless Operators and Hooks

josh-fell commented on code in PR #25324:
URL: https://github.com/apache/airflow/pull/25324#discussion_r936769508


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -412,3 +414,254 @@ def execute(self, context: 'Context') -> None:
             raise AirflowException(f'JobFlow termination failed: {response}')
         else:
             self.log.info('JobFlow with id %s terminated', self.job_flow_id)
+
+
+class EmrServerlessCreateApplicationOperator(BaseOperator):
+    """
+    Operator to create Serverless EMR Application
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EmrServerlessCreateApplicationOperator`
+
+    :param release_label: The EMR release version associated with the application.
+    :param job_type: The type of application you want to start, such as Spark or Hive.
+    :param wait_for_completion: If true, wait for the Application to start before returning. Default to True
+    :param client_request_token: The client idempotency token of the application to create.
+      Its value must be unique for each request.
+    :param config: Optional dictionary for arbitrary parameters to the boto API create_application call.
+    :param aws_conn_id: AWS connection to use
+    """
+
+    def __init__(
+        self,
+        release_label: str,
+        job_type: str,
+        client_request_token: str = '',
+        config: Optional[dict] = None,
+        wait_for_completion: bool = True,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ):
+        self.aws_conn_id = aws_conn_id
+        self.release_label = release_label
+        self.job_type = job_type
+        self.wait_for_completion = wait_for_completion
+        self.kwargs = kwargs
+        self.config = config or {}
+        super().__init__(**kwargs)
+
+        self.client_request_token = client_request_token or str(uuid4())
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook."""
+        return EmrServerlessHook(aws_conn_id=self.aws_conn_id)
+
+    def execute(self, context: 'Context'):
+        response = self.hook.conn.create_application(
+            clientToken=self.client_request_token,
+            releaseLabel=self.release_label,
+            type=self.job_type,
+            **self.config,
+        )
+        application_id = response['applicationId']
+
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException(f'Application Creation failed: {response}')
+
+        self.log.info('EMR serverless application created: %s', application_id)
+
+        # This should be replaced with a boto waiter when available.
+        self.hook.waiter(
+            get_state_callable=self.hook.conn.get_application,
+            get_state_args={'applicationId': application_id},
+            parse_response=['application', 'state'],
+            desired_state={'CREATED'},
+            failure_states=EmrServerlessApplicationSensor.FAILURE_STATES,
+            object_type='application',
+            action='created',
+        )
+
+        self.log.info('Starting application %s', application_id)
+        self.hook.conn.start_application(applicationId=application_id)
+
+        if self.wait_for_completion:
+            # This should be replaced with a boto waiter when available.
+            self.hook.waiter(
+                get_state_callable=self.hook.conn.get_application,
+                get_state_args={'applicationId': application_id},
+                parse_response=['application', 'state'],
+                desired_state={'STARTED'},
+                failure_states=EmrServerlessApplicationSensor.FAILURE_STATES,
+                object_type='application',
+                action='started',
+            )
+
+        return application_id
+
+
+class EmrServerlessStartJobOperator(BaseOperator):
+    """
+    Operator to start EMR Serverless job.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EmrServerlessStartJobOperator`
+
+    :param application_id: ID of the EMR Serverless application to start.
+    :param execution_role_arn: ARN of role to perform action.
+    :param job_driver: Driver that the job runs on.
+    :param configuration_overrides: Configuration specifications to override existing configurations.
+    :param client_request_token: The client idempotency token of the application to create.
+      Its value must be unique for each request.
+    :param config: Optional dictionary for arbitrary parameters to the boto API start_job_run call.
+    :param wait_for_completion: If true, waits for the job to start before returning. Defaults to True.
+    :param aws_conn_id: AWS connection to use
+    """
+
+    template_fields: Sequence[str] = ('application_id',)
+
+    def __init__(
+        self,
+        application_id: str,
+        execution_role_arn: str,
+        job_driver: dict,
+        configuration_overrides: Optional[dict],
+        client_request_token: str = '',
+        config: Optional[dict] = None,
+        wait_for_completion: bool = True,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ):
+        self.aws_conn_id = aws_conn_id
+        self.application_id = application_id
+        self.execution_role_arn = execution_role_arn
+        self.job_driver = job_driver
+        self.configuration_overrides = configuration_overrides
+        self.wait_for_completion = wait_for_completion
+        self.config = config or {}
+        super().__init__(**kwargs)
+
+        self.client_request_token = client_request_token or str(uuid4())
+
+    @cached_property
+    def hook(self) -> EmrServerlessHook:
+        """Create and return an EmrServerlessHook."""
+        return EmrServerlessHook(aws_conn_id=self.aws_conn_id)
+
+    def execute(self, context: 'Context') -> Dict:
+        self.log.info('Starting job on Application: %s', self.application_id)
+
+        app_state = self.hook.conn.get_application(applicationId=self.application_id)['application']['state']
+        if app_state not in {'CREATED', 'STARTED'}:

Review Comment:
   ```suggestion
           if app_state not in EmrServerlessApplicationSensor.SUCCESS_STATES:
   ```
   Small suggestion to use the attrs where you can like you have in other places. Up to you though (assuming it's functionally correct as well).



##########
docs/apache-airflow-providers-amazon/operators/emr_serverless.rst:
##########
@@ -0,0 +1,112 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+===============================
+Amazon EMR Serverless Operators
+===============================
+
+`Amazon EMR Serverless <https://aws.amazon.com/emr/serverless/>`__ is a serverless option
+in Amazon EMR that makes it easy for data analysts and engineers to run open-source big
+data analytics frameworks without configuring, managing, and scaling clusters or servers.
+You get all the features and benefits of Amazon EMR without the need for experts to plan
+and manage clusters.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+Operators
+---------
+.. _howto/operator:EmrServerlessCreateApplicationOperator:
+
+Create an EMR Serverless Application
+==========================

Review Comment:
   This underline needs to be at least the length of the text it's underlining. I imagine this might be a Build Docs failure for you.



##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -111,6 +116,152 @@ def failure_message_from_response(response: Dict[str, Any]) -> Optional[str]:
         raise NotImplementedError('Please implement failure_message_from_response() in subclass')
 
 
+class EmrServerlessJobSensor(BaseSensorOperator):
+    """
+    Asks for the state of the job run until it reaches a failure state or success state.
+    If the job run fails, the task will fail.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the guide:
+        :ref:`howto/sensor:EmrServerlessJobSensor`
+
+    :param application_id: application_id to check the state of
+    :param job_run_id: job_run_id to check the state of
+    :param target_states: a set of states to wait for, defaults to SUCCESS_STATES
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :param emr_conn_id: emr connection to use, defaults to 'emr_default'
+    """
+
+    INTERMEDIATE_STATES = {'PENDING', 'RUNNING', 'SCHEDULED', 'SUBMITTED'}
+    FAILURE_STATES = {'FAILED', 'CANCELLING', 'CANCELLED'}
+    SUCCESS_STATES = {'SUCCESS'}
+    TERMINAL_STATES = SUCCESS_STATES.union(FAILURE_STATES)
+
+    template_fields: Sequence[str] = (
+        'application_id',
+        'job_run_id',
+    )
+
+    def __init__(
+        self,
+        *,
+        application_id: str,
+        job_run_id: str,
+        target_states: Union[Set, FrozenSet] = frozenset(SUCCESS_STATES),
+        aws_conn_id: str = 'aws_default',
+        emr_conn_id: str = 'emr_default',
+        **kwargs: Any,
+    ) -> None:
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        self.target_states = target_states
+        self.application_id = application_id
+        self.job_run_id = job_run_id
+        super().__init__(**kwargs)
+
+    def poke(self, context: 'Context') -> bool:
+        state = None
+
+        try:
+            response = self.hook.conn.get_job_run(applicationId=self.application_id, jobRunId=self.job_run_id)
+        except Exception:

Review Comment:
   FWIW I think `AirflowException` is used too broadly in the code base. IMO `AirflowException`, and its subclasses, are really related to Airflow orchestration activity rather than activity done by what Airflow is _orchestrating_. It might make more sense to drop the `try/except` and bubble up whatever API call errors that might be thrown directly. Up to you though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org