You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/04/08 08:21:23 UTC

[GitHub] [airflow] feluelle commented on a change in pull request #5015: [AIRFLOW-4211] Add tests for WebHDFSHook

feluelle commented on a change in pull request #5015: [AIRFLOW-4211] Add tests for WebHDFSHook
URL: https://github.com/apache/airflow/pull/5015#discussion_r272931511
 
 

 ##########
 File path: airflow/hooks/webhdfs_hook.py
 ##########
 @@ -42,70 +41,91 @@ class AirflowWebHDFSHookException(AirflowException):
 class WebHDFSHook(BaseHook):
     """
     Interact with HDFS. This class is a wrapper around the hdfscli library.
+
+    :param webhdfs_conn_id: The connection id for the webhdfs client to connect to.
+    :type webhdfs_conn_id: str
+    :param proxy_user: The user used to authenticate.
+    :type proxy_user: str
     """
+
     def __init__(self, webhdfs_conn_id='webhdfs_default', proxy_user=None):
+        super(WebHDFSHook, self).__init__(webhdfs_conn_id)
         self.webhdfs_conn_id = webhdfs_conn_id
         self.proxy_user = proxy_user
 
     def get_conn(self):
         """
-        Returns a hdfscli InsecureClient object.
+        Establishes a connection depending on the security mode set via config or environment variable.
+
+        :return: a hdfscli InsecureClient or KerberosClient object.
+        :rtype: hdfs.InsecureClient or hdfs.ext.kerberos.KerberosClient
         """
-        nn_connections = self.get_connections(self.webhdfs_conn_id)
-        for nn in nn_connections:
+        connections = self.get_connections(self.webhdfs_conn_id)
+
+        for connection in connections:
             try:
-                self.log.debug('Trying namenode %s', nn.host)
-                connection_str = 'http://{nn.host}:{nn.port}'.format(nn=nn)
-                if _kerberos_security_mode:
-                    client = KerberosClient(connection_str)
-                else:
-                    proxy_user = self.proxy_user or nn.login
-                    client = InsecureClient(connection_str, user=proxy_user)
+                self.log.debug('Trying namenode %s', connection.host)
+                client = self._get_client(connection)
                 client.status('/')
-                self.log.debug('Using namenode %s for hook', nn.host)
+                self.log.debug('Using namenode %s for hook', connection.host)
                 return client
-            except HdfsError as e:
-                self.log.debug(
-                    "Read operation on namenode %s "
-                    "failed with error: %s", nn.host, e
-                )
-        nn_hosts = [c.host for c in nn_connections]
-        no_nn_error = "Read operations failed " \
-                      "on the namenodes below:\n{}".format("\n".join(nn_hosts))
-        raise AirflowWebHDFSHookException(no_nn_error)
+            except HdfsError as hdfs_error:
+                self.log.debug('Read operation on namenode %s failed with error: %s',
+                               connection.host, hdfs_error)
+
+        hosts = [connection.host for connection in connections]
+        error_message = 'Read operations failed on the namenodes below:\n{hosts}'.format(
+            hosts='\n'.join(hosts))
+        raise AirflowWebHDFSHookException(error_message)
+
+    def _get_client(self, connection):
+        connection_str = 'http://{host}:{port}'.format(host=connection.host, port=connection.port)
+
+        if _kerberos_security_mode:
+            client = KerberosClient(connection_str)
+        else:
+            proxy_user = self.proxy_user or connection.login
+            client = InsecureClient(connection_str, user=proxy_user)
+
+        return client
 
     def check_for_path(self, hdfs_path):
         """
         Check for the existence of a path in HDFS by querying FileStatus.
+
+        :param hdfs_path: The path to check.
+        :type hdfs_path: str
+        :return: True if the path exists and False if not.
+        :rtype: bool
         """
-        c = self.get_conn()
-        return bool(c.status(hdfs_path, strict=False))
+        conn = self.get_conn()
+
+        status = conn.status(hdfs_path, strict=False)
+        return bool(status)
 
-    def load_file(self, source, destination, overwrite=True, parallelism=1,
-                  **kwargs):
+    def load_file(self, source, destination, overwrite=True, parallelism=1, **kwargs):
         r"""
-        Uploads a file to HDFS
+        Uploads a file to HDFS.
 
-        :param source: Local path to file or folder. If a folder, all the files
-          inside of it will be uploaded (note that this implies that folders empty
-          of files will not be created remotely).
+        :param source: Local path to file or folder.
+            If it's a folder, all the files inside of it will be uploaded.
+            .. note:: That this implies that folders empty of files will not be created remotely.
 
 Review comment:
   True. Thank you @mik-laj :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services