You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/03/25 16:25:55 UTC

[GitHub] [airflow] GuzikJakub commented on a change in pull request #7454: [AIRFLOW-6833] HA for webhdfs connection

GuzikJakub commented on a change in pull request #7454: [AIRFLOW-6833] HA for webhdfs connection
URL: https://github.com/apache/airflow/pull/7454#discussion_r397991670
 
 

 ##########
 File path: airflow/providers/apache/hdfs/hooks/webhdfs.py
 ##########
 @@ -56,27 +57,35 @@ def __init__(self, webhdfs_conn_id='webhdfs_default', proxy_user=None):
     def get_conn(self):
         """
         Establishes a connection depending on the security mode set via config or environment variable.
-
         :return: a hdfscli InsecureClient or KerberosClient object.
         :rtype: hdfs.InsecureClient or hdfs.ext.kerberos.KerberosClient
         """
-        connections = self.get_connections(self.webhdfs_conn_id)
+        connection = self._find_valid_server()
+        if connection is None:
+            raise AirflowWebHDFSHookException("Failed to locate the valid server.")
+        return connection
 
+    def _find_valid_server(self):
+        connections = self.get_connections(self.webhdfs_conn_id)
         for connection in connections:
+            host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.log.info("Trying to connect to %s:%s", connection.host, connection.port)
             try:
-                self.log.debug('Trying namenode %s', connection.host)
-                client = self._get_client(connection)
-                client.status('/')
-                self.log.debug('Using namenode %s for hook', connection.host)
-                return client
+                conn_check = host_socket.connect_ex((connection.host, connection.port))
+                if conn_check == 0:
+                    self.log.info('Trying namenode %s', connection.host)
+                    client = self._get_client(connection)
+                    client.status('/')
+                    self.log.info('Using namenode %s for hook', connection.host)
+                    host_socket.close()
+                    return client
+                else:
+                    self.log.info("Could not connect to %s:%s", connection.host, connection.port)
+                host_socket.close()
             except HdfsError as hdfs_error:
-                self.log.debug('Read operation on namenode %s failed with error: %s',
-                               connection.host, hdfs_error)
-
-        hosts = [connection.host for connection in connections]
-        error_message = 'Read operations failed on the namenodes below:\n{hosts}'.format(
-            hosts='\n'.join(hosts))
-        raise AirflowWebHDFSHookException(error_message)
+                self.log.info('Read operation on namenode %s failed with error: %s',
 
 Review comment:
   > Wait, if there is an HDFS error, doesn't the host_socket need to closed? @GuzikJakub
   
   We don't need the host_socket closure. The connection is only open in the try block.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services