You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/09/18 18:47:21 UTC
[airflow] branch main updated: fix get_connections deprecation
warning in webhdfs hook (#18331)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2b62a75 fix get_connections deprecation warning in webhdfs hook (#18331)
2b62a75 is described below
commit 2b62a75a34d44ac7d9ed83c02421ff4867875577
Author: Aakcht <aa...@gmail.com>
AuthorDate: Sat Sep 18 21:47:00 2021 +0300
fix get_connections deprecation warning in webhdfs hook (#18331)
---
airflow/providers/apache/hdfs/hooks/webhdfs.py | 35 ++++++--------
tests/providers/apache/hdfs/hooks/test_webhdfs.py | 59 ++++++++++++-----------
2 files changed, 46 insertions(+), 48 deletions(-)
diff --git a/airflow/providers/apache/hdfs/hooks/webhdfs.py b/airflow/providers/apache/hdfs/hooks/webhdfs.py
index 002e4f0..6659ad6 100644
--- a/airflow/providers/apache/hdfs/hooks/webhdfs.py
+++ b/airflow/providers/apache/hdfs/hooks/webhdfs.py
@@ -70,26 +70,23 @@ class WebHDFSHook(BaseHook):
return connection
def _find_valid_server(self) -> Any:
- connections = self.get_connections(self.webhdfs_conn_id)
- for connection in connections:
- host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.log.info("Trying to connect to %s:%s", connection.host, connection.port)
- try:
- conn_check = host_socket.connect_ex((connection.host, connection.port))
- if conn_check == 0:
- self.log.info('Trying namenode %s', connection.host)
- client = self._get_client(connection)
- client.status('/')
- self.log.info('Using namenode %s for hook', connection.host)
- host_socket.close()
- return client
- else:
- self.log.error("Could not connect to %s:%s", connection.host, connection.port)
+ connection = self.get_connection(self.webhdfs_conn_id)
+ host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.log.info("Trying to connect to %s:%s", connection.host, connection.port)
+ try:
+ conn_check = host_socket.connect_ex((connection.host, connection.port))
+ if conn_check == 0:
+ self.log.info('Trying namenode %s', connection.host)
+ client = self._get_client(connection)
+ client.status('/')
+ self.log.info('Using namenode %s for hook', connection.host)
host_socket.close()
- except HdfsError as hdfs_error:
- self.log.error(
- 'Read operation on namenode %s failed with error: %s', connection.host, hdfs_error
- )
+ return client
+ else:
+ self.log.error("Could not connect to %s:%s", connection.host, connection.port)
+ host_socket.close()
+ except HdfsError as hdfs_error:
+ self.log.error('Read operation on namenode %s failed with error: %s', connection.host, hdfs_error)
return None
def _get_client(self, connection: Connection) -> Any:
diff --git a/tests/providers/apache/hdfs/hooks/test_webhdfs.py b/tests/providers/apache/hdfs/hooks/test_webhdfs.py
index 1ded92e..9288f5a 100644
--- a/tests/providers/apache/hdfs/hooks/test_webhdfs.py
+++ b/tests/providers/apache/hdfs/hooks/test_webhdfs.py
@@ -17,7 +17,7 @@
# under the License.
import unittest
-from unittest.mock import call, patch
+from unittest.mock import patch
import pytest
from hdfs import HdfsError
@@ -33,33 +33,34 @@ class TestWebHDFSHook(unittest.TestCase):
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session")
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
- 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
- return_value=[Connection(host='host_1', port=123), Connection(host='host_2', port=321, login='user')],
+ 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+ return_value=Connection(host='host_2', port=321, login='user'),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
- def test_get_conn(self, socket_mock, mock_get_connections, mock_insecure_client, mock_session):
- mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
+ def test_get_conn(self, socket_mock, mock_get_connection, mock_insecure_client, mock_session):
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()
-
- mock_insecure_client.assert_has_calls(
- [
- call(
- f'http://{connection.host}:{connection.port}',
- user=connection.login,
- session=mock_session.return_value,
- )
- for connection in mock_get_connections.return_value
- ]
+ connection = mock_get_connection.return_value
+ mock_insecure_client.assert_called_once_with(
+ f'http://{connection.host}:{connection.port}',
+ user=connection.login,
+ session=mock_session.return_value,
)
mock_insecure_client.return_value.status.assert_called_once_with('/')
assert conn == mock_insecure_client.return_value
+ @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient', side_effect=HdfsError('Error'))
+ @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
+ def test_get_conn_hdfs_error(self, socket_mock, mock_insecure_client):
+ socket_mock.socket.return_value.connect_ex.return_value = 0
+ with pytest.raises(AirflowWebHDFSHookException):
+ self.webhdfs_hook.get_conn()
+
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session")
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.KerberosClient', create=True)
@patch(
- 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
- return_value=[Connection(host='host_1', port=123)],
+ 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+ return_value=Connection(host='host_1', port=123),
)
@patch('airflow.providers.apache.hdfs.hooks.webhdfs._kerberos_security_mode', return_value=True)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
@@ -67,14 +68,14 @@ class TestWebHDFSHook(unittest.TestCase):
self,
socket_mock,
mock_kerberos_security_mode,
- mock_get_connections,
+ mock_get_connection,
mock_kerberos_client,
mock_session,
):
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()
- connection = mock_get_connections.return_value[0]
+ connection = mock_get_connection.return_value
mock_kerberos_client.assert_called_once_with(
f'http://{connection.host}:{connection.port}', session=mock_session.return_value
)
@@ -119,33 +120,33 @@ class TestWebHDFSHook(unittest.TestCase):
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.KerberosClient', create=True)
@patch(
- 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
- return_value=[
- Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": "/ssl/cert/path"})
- ],
+ 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+ return_value=Connection(
+ host='host_1', port=123, extra={"use_ssl": "True", "verify": "/ssl/cert/path"}
+ ),
)
@patch('airflow.providers.apache.hdfs.hooks.webhdfs._kerberos_security_mode', return_value=True)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_kerberos_ssl(
- self, socket_mock, mock_kerberos_security_mode, mock_get_connections, mock_kerberos_client
+ self, socket_mock, mock_kerberos_security_mode, mock_get_connection, mock_kerberos_client
):
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
- connection = mock_get_connections.return_value[0]
+ connection = mock_get_connection.return_value
assert f'https://{connection.host}:{connection.port}' == mock_kerberos_client.call_args[0][0]
assert "/ssl/cert/path" == mock_kerberos_client.call_args[1]['session'].verify
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
- 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
- return_value=[Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": False})],
+ 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+ return_value=Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": False}),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
- def test_conn_insecure_ssl(self, socket_mock, mock_get_connections, mock_insecure_client):
+ def test_conn_insecure_ssl(self, socket_mock, mock_get_connection, mock_insecure_client):
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
- connection = mock_get_connections.return_value[0]
+ connection = mock_get_connection.return_value
assert f'https://{connection.host}:{connection.port}' == mock_insecure_client.call_args[0][0]
assert not mock_insecure_client.call_args[1]['session'].verify