You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/09/18 18:47:21 UTC

[airflow] branch main updated: fix get_connections deprecation warning in webhdfs hook (#18331)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b62a75  fix get_connections deprecation warning in webhdfs hook (#18331)
2b62a75 is described below

commit 2b62a75a34d44ac7d9ed83c02421ff4867875577
Author: Aakcht <aa...@gmail.com>
AuthorDate: Sat Sep 18 21:47:00 2021 +0300

    fix get_connections deprecation warning in webhdfs hook (#18331)
---
 airflow/providers/apache/hdfs/hooks/webhdfs.py    | 35 ++++++--------
 tests/providers/apache/hdfs/hooks/test_webhdfs.py | 59 ++++++++++++-----------
 2 files changed, 46 insertions(+), 48 deletions(-)

diff --git a/airflow/providers/apache/hdfs/hooks/webhdfs.py b/airflow/providers/apache/hdfs/hooks/webhdfs.py
index 002e4f0..6659ad6 100644
--- a/airflow/providers/apache/hdfs/hooks/webhdfs.py
+++ b/airflow/providers/apache/hdfs/hooks/webhdfs.py
@@ -70,26 +70,23 @@ class WebHDFSHook(BaseHook):
         return connection
 
     def _find_valid_server(self) -> Any:
-        connections = self.get_connections(self.webhdfs_conn_id)
-        for connection in connections:
-            host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            self.log.info("Trying to connect to %s:%s", connection.host, connection.port)
-            try:
-                conn_check = host_socket.connect_ex((connection.host, connection.port))
-                if conn_check == 0:
-                    self.log.info('Trying namenode %s', connection.host)
-                    client = self._get_client(connection)
-                    client.status('/')
-                    self.log.info('Using namenode %s for hook', connection.host)
-                    host_socket.close()
-                    return client
-                else:
-                    self.log.error("Could not connect to %s:%s", connection.host, connection.port)
+        connection = self.get_connection(self.webhdfs_conn_id)
+        host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.log.info("Trying to connect to %s:%s", connection.host, connection.port)
+        try:
+            conn_check = host_socket.connect_ex((connection.host, connection.port))
+            if conn_check == 0:
+                self.log.info('Trying namenode %s', connection.host)
+                client = self._get_client(connection)
+                client.status('/')
+                self.log.info('Using namenode %s for hook', connection.host)
                 host_socket.close()
-            except HdfsError as hdfs_error:
-                self.log.error(
-                    'Read operation on namenode %s failed with error: %s', connection.host, hdfs_error
-                )
+                return client
+            else:
+                self.log.error("Could not connect to %s:%s", connection.host, connection.port)
+            host_socket.close()
+        except HdfsError as hdfs_error:
+            self.log.error('Read operation on namenode %s failed with error: %s', connection.host, hdfs_error)
         return None
 
     def _get_client(self, connection: Connection) -> Any:
diff --git a/tests/providers/apache/hdfs/hooks/test_webhdfs.py b/tests/providers/apache/hdfs/hooks/test_webhdfs.py
index 1ded92e..9288f5a 100644
--- a/tests/providers/apache/hdfs/hooks/test_webhdfs.py
+++ b/tests/providers/apache/hdfs/hooks/test_webhdfs.py
@@ -17,7 +17,7 @@
 # under the License.
 
 import unittest
-from unittest.mock import call, patch
+from unittest.mock import patch
 
 import pytest
 from hdfs import HdfsError
@@ -33,33 +33,34 @@ class TestWebHDFSHook(unittest.TestCase):
     @patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session")
     @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
     @patch(
-        'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
-        return_value=[Connection(host='host_1', port=123), Connection(host='host_2', port=321, login='user')],
+        'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+        return_value=Connection(host='host_2', port=321, login='user'),
     )
     @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
-    def test_get_conn(self, socket_mock, mock_get_connections, mock_insecure_client, mock_session):
-        mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
+    def test_get_conn(self, socket_mock, mock_get_connection, mock_insecure_client, mock_session):
         socket_mock.socket.return_value.connect_ex.return_value = 0
         conn = self.webhdfs_hook.get_conn()
-
-        mock_insecure_client.assert_has_calls(
-            [
-                call(
-                    f'http://{connection.host}:{connection.port}',
-                    user=connection.login,
-                    session=mock_session.return_value,
-                )
-                for connection in mock_get_connections.return_value
-            ]
+        connection = mock_get_connection.return_value
+        mock_insecure_client.assert_called_once_with(
+            f'http://{connection.host}:{connection.port}',
+            user=connection.login,
+            session=mock_session.return_value,
         )
         mock_insecure_client.return_value.status.assert_called_once_with('/')
         assert conn == mock_insecure_client.return_value
 
+    @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient', side_effect=HdfsError('Error'))
+    @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
+    def test_get_conn_hdfs_error(self, socket_mock, mock_insecure_client):
+        socket_mock.socket.return_value.connect_ex.return_value = 0
+        with pytest.raises(AirflowWebHDFSHookException):
+            self.webhdfs_hook.get_conn()
+
     @patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session")
     @patch('airflow.providers.apache.hdfs.hooks.webhdfs.KerberosClient', create=True)
     @patch(
-        'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
-        return_value=[Connection(host='host_1', port=123)],
+        'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+        return_value=Connection(host='host_1', port=123),
     )
     @patch('airflow.providers.apache.hdfs.hooks.webhdfs._kerberos_security_mode', return_value=True)
     @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
@@ -67,14 +68,14 @@ class TestWebHDFSHook(unittest.TestCase):
         self,
         socket_mock,
         mock_kerberos_security_mode,
-        mock_get_connections,
+        mock_get_connection,
         mock_kerberos_client,
         mock_session,
     ):
         socket_mock.socket.return_value.connect_ex.return_value = 0
         conn = self.webhdfs_hook.get_conn()
 
-        connection = mock_get_connections.return_value[0]
+        connection = mock_get_connection.return_value
         mock_kerberos_client.assert_called_once_with(
             f'http://{connection.host}:{connection.port}', session=mock_session.return_value
         )
@@ -119,33 +120,33 @@ class TestWebHDFSHook(unittest.TestCase):
 
     @patch('airflow.providers.apache.hdfs.hooks.webhdfs.KerberosClient', create=True)
     @patch(
-        'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
-        return_value=[
-            Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": "/ssl/cert/path"})
-        ],
+        'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+        return_value=Connection(
+            host='host_1', port=123, extra={"use_ssl": "True", "verify": "/ssl/cert/path"}
+        ),
     )
     @patch('airflow.providers.apache.hdfs.hooks.webhdfs._kerberos_security_mode', return_value=True)
     @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
     def test_conn_kerberos_ssl(
-        self, socket_mock, mock_kerberos_security_mode, mock_get_connections, mock_kerberos_client
+        self, socket_mock, mock_kerberos_security_mode, mock_get_connection, mock_kerberos_client
     ):
         socket_mock.socket.return_value.connect_ex.return_value = 0
         self.webhdfs_hook.get_conn()
-        connection = mock_get_connections.return_value[0]
+        connection = mock_get_connection.return_value
 
         assert f'https://{connection.host}:{connection.port}' == mock_kerberos_client.call_args[0][0]
         assert "/ssl/cert/path" == mock_kerberos_client.call_args[1]['session'].verify
 
     @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
     @patch(
-        'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
-        return_value=[Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": False})],
+        'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+        return_value=Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": False}),
     )
     @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
-    def test_conn_insecure_ssl(self, socket_mock, mock_get_connections, mock_insecure_client):
+    def test_conn_insecure_ssl(self, socket_mock, mock_get_connection, mock_insecure_client):
         socket_mock.socket.return_value.connect_ex.return_value = 0
         self.webhdfs_hook.get_conn()
-        connection = mock_get_connections.return_value[0]
+        connection = mock_get_connection.return_value
 
         assert f'https://{connection.host}:{connection.port}' == mock_insecure_client.call_args[0][0]
         assert not mock_insecure_client.call_args[1]['session'].verify