You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/19 07:04:38 UTC
[airflow] branch main updated: Adding Authentication to webhdfs sensor (#25110)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 52cf579352 Adding Authentication to webhdfs sensor (#25110)
52cf579352 is described below
commit 52cf5793523d3acab480b1933fb7cb33db1c15c7
Author: Ankur Bajaj <an...@gmail.com>
AuthorDate: Tue Jul 19 09:04:31 2022 +0200
Adding Authentication to webhdfs sensor (#25110)
---
airflow/providers/apache/hdfs/hooks/webhdfs.py | 7 +++++-
tests/providers/apache/hdfs/hooks/test_webhdfs.py | 28 +++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/apache/hdfs/hooks/webhdfs.py b/airflow/providers/apache/hdfs/hooks/webhdfs.py
index 9fcfa99367..dc27a80237 100644
--- a/airflow/providers/apache/hdfs/hooks/webhdfs.py
+++ b/airflow/providers/apache/hdfs/hooks/webhdfs.py
@@ -80,6 +80,7 @@ class WebHDFSHook(BaseHook):
namenode,
connection.port,
connection.login,
+ connection.get_password(),
connection.schema,
connection.extra_dejson,
)
@@ -93,9 +94,13 @@ class WebHDFSHook(BaseHook):
self.log.info('Read operation on namenode %s failed with error: %s', namenode, hdfs_error)
return None
- def _get_client(self, namenode: str, port: int, login: str, schema: str, extra_dejson: dict) -> Any:
+ def _get_client(
+ self, namenode: str, port: int, login: str, password: Optional[str], schema: str, extra_dejson: dict
+ ) -> Any:
connection_str = f'http://{namenode}'
session = requests.Session()
+ if password is not None:
+ session.auth = (login, password)
if extra_dejson.get('use_ssl', 'False') == 'True' or extra_dejson.get('use_ssl', False):
connection_str = f'https://{namenode}'
diff --git a/tests/providers/apache/hdfs/hooks/test_webhdfs.py b/tests/providers/apache/hdfs/hooks/test_webhdfs.py
index 0dfede42c5..473ef02595 100644
--- a/tests/providers/apache/hdfs/hooks/test_webhdfs.py
+++ b/tests/providers/apache/hdfs/hooks/test_webhdfs.py
@@ -112,6 +112,34 @@ class TestWebHDFSHook(unittest.TestCase):
mock_insecure_client.return_value.status.assert_called_once_with('/')
assert conn == mock_insecure_client.return_value
+ @patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', create=True)
+ @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
+ @patch(
+ 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+ return_value=Connection(host='host_1.com,host_2.com', login='user', password='password'),
+ )
+ @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
+ def test_get_conn_with_password_without_port_schema(
+ self, socket_mock, mock_get_connection, mock_insecure_client, mock_session
+ ):
+ mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
+ socket_mock.socket.return_value.connect_ex.return_value = 0
+ conn = self.webhdfs_hook.get_conn()
+ connection = mock_get_connection.return_value
+ hosts = connection.host.split(',')
+ mock_insecure_client.assert_has_calls(
+ [
+ call(
+ f'http://{host}',
+ user=connection.login,
+ session=mock_session.return_value,
+ )
+ for host in hosts
+ ]
+ )
+ mock_insecure_client.return_value.status.assert_called_once_with('/')
+ assert conn == mock_insecure_client.return_value
+
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient', side_effect=HdfsError('Error'))
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',