You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/12/23 18:25:34 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #6090: [AIRFLOW-5470] Add Apache Livy REST operator

kaxil commented on a change in pull request #6090: [AIRFLOW-5470] Add Apache Livy REST operator
URL: https://github.com/apache/airflow/pull/6090#discussion_r360967493
 
 

 ##########
 File path: tests/contrib/hooks/test_livy_hook.py
 ##########
 @@ -0,0 +1,414 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import unittest
+from unittest.mock import MagicMock, patch
+
+from requests.exceptions import RequestException
+
+from airflow import AirflowException
+from airflow.contrib.hooks.livy_hook import BatchState, LivyHook
+from airflow.models import Connection
+from airflow.utils import db
+
+TEST_ID = 100
+SAMPLE_GET_RESPONSE = {'id': TEST_ID, 'state': BatchState.SUCCESS.value}
+
+
+class TestLivyHook(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        db.merge_conn(Connection(conn_id='livy_default', host='host', schema='http', port='8998'))
+        db.merge_conn(Connection(conn_id='default_port', host='http://host'))
+        db.merge_conn(Connection(conn_id='default_protocol', host='host'))
+        db.merge_conn(Connection(conn_id='port_set', host='host', port=1234))
+        db.merge_conn(Connection(conn_id='schema_set', host='host', schema='zzz'))
+        db.merge_conn(Connection(conn_id='dont_override_schema', host='http://host', schema='zzz'))
+        db.merge_conn(Connection(conn_id='missing_host', port=1234))
+        db.merge_conn(Connection(conn_id='invalid_uri', uri='http://invalid_uri:4321'))
+
+    def test_build_get_hook(self):
+
+        connection_url_mapping = {
+            # id, expected
+            'default_port': 'http://host',
+            'default_protocol': 'http://host',
+            'port_set': 'http://host:1234',
+            'schema_set': 'zzz://host',
+            'dont_override_schema': 'http://host',
+        }
+
+        for conn_id, expected in connection_url_mapping.items():
+            with self.subTest(conn_id):
+                hook = LivyHook(livy_conn_id=conn_id)
+
+                hook.get_conn()
+                self.assertEqual(hook.base_url, expected)
+
+    @unittest.skip("inherited HttpHook does not handle missing hostname")
+    def test_missing_host(self):
+        with self.assertRaises(AirflowException):
+            LivyHook(livy_conn_id='missing_host').get_conn()
+
+    def test_build_body(self):
+        with self.subTest('minimal request'):
+            body = LivyHook.build_post_batch_body(file='appname')
+
+            self.assertEqual(body, {'file': 'appname'})
+
+        with self.subTest('complex request'):
+            body = LivyHook.build_post_batch_body(
+                file='appname',
+                class_name='org.example.livy',
+                proxy_user='proxyUser',
+                args=['a', '1'],
+                jars=['jar1', 'jar2'],
+                files=['file1', 'file2'],
+                py_files=['py1', 'py2'],
+                queue='queue',
+                name='name',
+                conf={'a': 'b'},
+                driver_memory='1M',
+                executor_memory='1m',
+                executor_cores='1',
+                num_executors='10',
+            )
+
+            self.assertEqual(body, {
+                'file': 'appname',
+                'className': 'org.example.livy',
+                'proxyUser': 'proxyUser',
+                'args': ['a', '1'],
+                'jars': ['jar1', 'jar2'],
+                'files': ['file1', 'file2'],
+                'pyFiles': ['py1', 'py2'],
+                'queue': 'queue',
+                'name': 'name',
+                'conf': {'a': 'b'},
+                'driverMemory': '1M',
+                'executorMemory': '1m',
+                'executorCores': '1',
+                'numExecutors': '10'
+            })
+
+    def test_parameters_validation(self):
+        with self.subTest('not a size'):
+            with self.assertRaises(AirflowException):
+                LivyHook.build_post_batch_body(file='appname', executor_memory='xxx')
+
+        with self.subTest('list of stringables'):
+            self.assertEqual(
+                LivyHook.build_post_batch_body(file='appname', args=['a', 1, 0.1])['args'],
+                ['a', '1', '0.1']
+            )
+
+    def test_validate_size_format(self):
+        with self.subTest('lower 1'):
+            self.assertTrue(LivyHook._validate_size_format('1m'))
+
+        with self.subTest('lower 2'):
+            self.assertTrue(LivyHook._validate_size_format('1mb'))
+
+        with self.subTest('upper 1'):
+            self.assertTrue(LivyHook._validate_size_format('1G'))
+
+        with self.subTest('upper 2'):
+            self.assertTrue(LivyHook._validate_size_format('1GB'))
+
+        with self.subTest('numeric'):
+            with self.assertRaises(AirflowException):
+                LivyHook._validate_size_format(1)
+
+        with self.subTest('None'):
+            self.assertTrue(LivyHook._validate_size_format(None))
+
+    def test_validate_extra_conf(self):
+        with self.subTest('valid'):
+            try:
+                LivyHook._validate_extra_conf({'k1': 'v1', 'k2': 0})
+            except AirflowException:
+                self.fail("Exception raised")
+
+        with self.subTest('empty dict'):
+            try:
+                LivyHook._validate_extra_conf({})
+            except AirflowException:
+                self.fail("Exception raised")
+
+        with self.subTest('none'):
+            try:
+                LivyHook._validate_extra_conf(None)
+            except AirflowException:
+                self.fail("Exception raised")
+
+        with self.subTest('not a dict 1'):
+            with self.assertRaises(AirflowException):
+                LivyHook._validate_extra_conf('k1=v1')
+
+        with self.subTest('not a dict 2'):
+            with self.assertRaises(AirflowException):
+                LivyHook._validate_extra_conf([('k1', 'v1'), ('k2', 0)])
+
+        with self.subTest('nested dict'):
+            with self.assertRaises(AirflowException):
+                LivyHook._validate_extra_conf({'outer': {'inner': 'val'}})
+
+        with self.subTest('empty items'):
+            with self.assertRaises(AirflowException):
+                LivyHook._validate_extra_conf({'has_val': 'val', 'no_val': None})
+
+        with self.subTest('empty string'):
+            with self.assertRaises(AirflowException):
+                LivyHook._validate_extra_conf({'has_val': 'val', 'no_val': ''})
+
+    @staticmethod
+    def build_mock_response(mock_request, status_code, body):
+        """helper method"""
+        if not isinstance(mock_request, MagicMock):
+            raise ValueError("Mock expected")
+        mock_request.return_value.status_code = status_code
+        mock_request.return_value.json.return_value = body
+
+    @patch('airflow.contrib.hooks.livy_hook.LivyHook.run_method')
+    def test_post_batch(self, mock_request):
+
+        batch_id = 100
+
+        hook = LivyHook()
+
+        with self.subTest('batch submit success'):
+            TestLivyHook.build_mock_response(
 
 Review comment:
   @lucacavazzana Can you use requests_mock like Ash said

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services