You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/01/09 14:51:30 UTC

[GitHub] [airflow] ashb commented on a change in pull request #6090: [AIRFLOW-5470] Add Apache Livy REST operator

ashb commented on a change in pull request #6090: [AIRFLOW-5470] Add Apache Livy REST operator
URL: https://github.com/apache/airflow/pull/6090#discussion_r364755778
 
 

 ##########
 File path: airflow/contrib/hooks/livy_hook.py
 ##########
 @@ -0,0 +1,379 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This module contains the Apache Livy hook.
+"""
+
+import json
+import re
+from enum import Enum
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.http_hook import HttpHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class BatchState(Enum):
+    """
+    Batch session states
+    """
+    NOT_STARTED = 'not_started'
+    STARTING = 'starting'
+    RUNNING = 'running'
+    IDLE = 'idle'
+    BUSY = 'busy'
+    SHUTTING_DOWN = 'shutting_down'
+    ERROR = 'error'
+    DEAD = 'dead'
+    KILLED = 'killed'
+    SUCCESS = 'success'
+
+
+class LivyHook(HttpHook, LoggingMixin):
+    """
+    Hook for Apache Livy through the REST API.
+
+    :param livy_conn_id: reference to a pre-defined Livy Connection.
+    :type livy_conn_id: str
+
+    .. seealso::
+        For more details refer to the Apache Livy API reference:
+        https://livy.apache.org/docs/latest/rest-api.html
+    """
+
+    TERMINAL_STATES = {
+        BatchState.SUCCESS,
+        BatchState.DEAD,
+        BatchState.KILLED,
+        BatchState.ERROR,
+    }
+
+    _def_headers = {
+        'Content-Type': 'application/json',
+        'Accept': 'application/json'
+    }
+
+    def __init__(self, livy_conn_id='livy_default'):
+        super(LivyHook, self).__init__(http_conn_id=livy_conn_id)
+
+    def get_conn(self, headers=None):
+        """
+        Returns http session for use with requests
+
+        :param headers: additional headers to be passed through as a dictionary
+        :type headers: dict
+        :return: requests session
+        :rtype: requests.Session
+        """
+        tmp_headers = self._def_headers.copy()  # setting default headers
+        if headers:
+            tmp_headers.update(headers)
+        return super().get_conn(tmp_headers)
+
+    def run_method(self, method='GET', endpoint=None, data=None, headers=None, extra_options=None):
+        """
+        Wrapper for HttpHook, allows to change method on the same HttpHook
+
+        :param method: http method
+        :type method: str
+        :param endpoint: endpoint
+        :type endpoint: str
+        :param data: request payload
+        :type data: dict
+        :param headers: headers
+        :type headers: dict
+        :param extra_options: extra options
+        :type extra_options: dict
+        :return: http response
+        :rtype: requests.Response
+        """
+        if method not in ('GET', 'POST', 'PUT', 'DELETE', 'HEAD'):
+            raise AirflowException("Invalid http method '{}'".format(method))
+
+        back_method = self.method
+        self.method = method
+        try:
+            result = self.run(endpoint, data, headers, extra_options)
+        finally:
+            self.method = back_method
+        return result
+
+    def post_batch(self, *args, **kwargs):
+        """
+        Perform request to submit batch
+
+        :return: batch session id
+        :rtype: int
+        """
+        batch_submit_body = json.dumps(self.build_post_batch_body(*args, **kwargs))
+
+        if self.base_url is None:
+            # need to init self.base_url
+            self.get_conn()
+        self.log.info("Submitting job {} to {}".format(batch_submit_body, self.base_url))
+
+        response = self.run_method(
+            method='POST',
+            endpoint='/batches',
+            data=batch_submit_body
+        )
+        self.log.debug("Got response: {}".format(response.text))
+
+        if response.status_code != 201:
+            raise AirflowException("Could not submit batch. Status code: {}".format(response.status_code))
 
 Review comment:
   Is it worth including `response.json()` here or something in the error message -- I guess when it fails to get a bacth ID the response might include an error?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services