You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/10/25 16:49:56 UTC

[GitHub] [airflow] feluelle commented on a change in pull request #6281: Run batches of (self-terminating) EMR JobFlows [AIRFLOW-XXX]

feluelle commented on a change in pull request #6281: Run batches of (self-terminating) EMR JobFlows [AIRFLOW-XXX]
URL: https://github.com/apache/airflow/pull/6281#discussion_r339136077
 
 

 ##########
 File path: airflow/contrib/sensors/emr_run_job_flows.py
 ##########
 @@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""EmrRunJobFlows manages cluster queue by implementing an EMR sensor."""
+
+from airflow import AirflowException
+from airflow.contrib.hooks.emr_hook import EmrHook
+from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
+from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
+from airflow.utils.decorators import apply_defaults
+
+
+class EmrRunJobFlows(EmrBaseSensor):
+    """
+    Submits batches of self-terminating EMR Job Flows and waits for their steps
+    to complete. This operator submits a list of EMR clusters in batches, where
+    each Job Flow is expected to be self-terminating and list all the EMR steps
+    it is expected to execute. Only basic retry logic.
+
+    Implementation Note: For each cluster, we submit all the steps at cluster
+    creation time. This partially frees the cluster from the vagaries of the
+    Airflow scheduler. Since we rely on EMR to terminate its clusters, any
+    failed step will need to terminate the cluster and the cluster itself should
+    auto-terminate as per [1]. In other words, each JobFlow must auto-terminate
+    (likely via the `job_flows` parameter) by setting its Instances'
+    `"KeepJobFlowAliveWhenNoSteps": False`. Additionally, consider setting each
+    Step's `"ActionOnFailure": "TERMINATE_CLUSTER"` to allow failing-fast if
+    your workflow allows for it.
+
+    [1]: https://docs.aws.amazon.com/emr/latest/ManagementGuide/\
+    UsingEMR_TerminationProtection.html#emr-termination-protection-steps
+
+    TODO: The utility of the EmrBaseSensor that we extend is somewhat limited.
+    Currently, it asks for the state of its JobFlow until that JobFlow reaches a
+    terminal state. If the EMR JobFlow fails, the sensor will mark the task as
+    failed. If custom EMR sensor logic is pursued, we could set up step-wise
+    monitoring and timeouts, which would allow for context-specific retries
+    using XComs, and maybe able to extend the implementation to allow for
+    cross-cluster logic, such as waiting for all clusters in a batch to finish
+    even when some fail.
+
+    :param job_flows: a queue of EMR JobFlows. It's a list of dicts, each one
+        mapping job_flow names to their configurations:
+        [{job_flow_name: job_flow_overrides}]. Each dict in the list represents
+        the job flows which should run in parallel, and every cluster in the
+        preceding dict is expected to have come to a successful terminal state,
+        prior to submitting the next dict. See boto3's job_flow_overrides EMR
+        details in
+        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/\
+        services/emr.html#EMR.Client.run_job_flow (templated)
+    :type job_flows: list
+    """
+
+    template_fields = ['job_flows']
+    template_ext = ()
+    # EMR logo... ~RGB(237,165,83)
+    ui_color = "#eda553"
+
+    # Overrides for EmrBaseSensor
+    NON_TERMINAL_STATES = EmrJobFlowSensor.NON_TERMINAL_STATES
+    FAILED_STATE = EmrJobFlowSensor.FAILED_STATE
+
+    @apply_defaults
+    def __init__(
+            self,
+            job_flows,
+            emr_conn_id='emr_default',
+            # require_auto_termination = False,
+            *args, **kwargs):
+        """
+        C0111
+        """
+        super().__init__(*args, **kwargs)
+        self.job_flows = job_flows
+        self.emr_conn_id = emr_conn_id
+        # These two fields will be filled in as clusters are requested and poked
+        self.current_batch = {}
+        self.statuses = []
+
+    def execute(self, context):
+        """
+        C0111
+        """
+        self.log.info(
+            "The clusters will be submitted across the following batches: %s",
+            [set(batch.keys()) for batch in self.job_flows])
+        # TODO: Verify all clusters set `"KeepJobFlowAliveWhenNoSteps": False`
+        # if self.require_auto_termination
 
 Review comment:
   Please remove comments like that where you just disable code.
   
   Comments should only be used to explain something that you are not able to explain by code alone.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services