You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/19 07:04:38 UTC

[airflow] branch main updated: Adding Authentication to webhdfs sensor (#25110)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 52cf579352 Adding Authentication to webhdfs sensor  (#25110)
52cf579352 is described below

commit 52cf5793523d3acab480b1933fb7cb33db1c15c7
Author: Ankur Bajaj <an...@gmail.com>
AuthorDate: Tue Jul 19 09:04:31 2022 +0200

    Adding Authentication to webhdfs sensor  (#25110)
---
 airflow/providers/apache/hdfs/hooks/webhdfs.py    |  7 +++++-
 tests/providers/apache/hdfs/hooks/test_webhdfs.py | 28 +++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/apache/hdfs/hooks/webhdfs.py b/airflow/providers/apache/hdfs/hooks/webhdfs.py
index 9fcfa99367..dc27a80237 100644
--- a/airflow/providers/apache/hdfs/hooks/webhdfs.py
+++ b/airflow/providers/apache/hdfs/hooks/webhdfs.py
@@ -80,6 +80,7 @@ class WebHDFSHook(BaseHook):
                         namenode,
                         connection.port,
                         connection.login,
+                        connection.get_password(),
                         connection.schema,
                         connection.extra_dejson,
                     )
@@ -93,9 +94,13 @@ class WebHDFSHook(BaseHook):
                 self.log.info('Read operation on namenode %s failed with error: %s', namenode, hdfs_error)
         return None
 
-    def _get_client(self, namenode: str, port: int, login: str, schema: str, extra_dejson: dict) -> Any:
+    def _get_client(
+        self, namenode: str, port: int, login: str, password: Optional[str], schema: str, extra_dejson: dict
+    ) -> Any:
         connection_str = f'http://{namenode}'
         session = requests.Session()
+        if password is not None:
+            session.auth = (login, password)
 
         if extra_dejson.get('use_ssl', 'False') == 'True' or extra_dejson.get('use_ssl', False):
             connection_str = f'https://{namenode}'
diff --git a/tests/providers/apache/hdfs/hooks/test_webhdfs.py b/tests/providers/apache/hdfs/hooks/test_webhdfs.py
index 0dfede42c5..473ef02595 100644
--- a/tests/providers/apache/hdfs/hooks/test_webhdfs.py
+++ b/tests/providers/apache/hdfs/hooks/test_webhdfs.py
@@ -112,6 +112,34 @@ class TestWebHDFSHook(unittest.TestCase):
         mock_insecure_client.return_value.status.assert_called_once_with('/')
         assert conn == mock_insecure_client.return_value
 
+    @patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', create=True)
+    @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
+    @patch(
+        'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
+        return_value=Connection(host='host_1.com,host_2.com', login='user', password='password'),
+    )
+    @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
+    def test_get_conn_with_password_without_port_schema(
+        self, socket_mock, mock_get_connection, mock_insecure_client, mock_session
+    ):
+        mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
+        socket_mock.socket.return_value.connect_ex.return_value = 0
+        conn = self.webhdfs_hook.get_conn()
+        connection = mock_get_connection.return_value
+        hosts = connection.host.split(',')
+        mock_insecure_client.assert_has_calls(
+            [
+                call(
+                    f'http://{host}',
+                    user=connection.login,
+                    session=mock_session.return_value,
+                )
+                for host in hosts
+            ]
+        )
+        mock_insecure_client.return_value.status.assert_called_once_with('/')
+        assert conn == mock_insecure_client.return_value
+
     @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient', side_effect=HdfsError('Error'))
     @patch(
         'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',