You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/14 10:23:25 UTC

[GitHub] [airflow] aladinoss commented on a change in pull request #16338: SFTP hook to prefer the SSH paramiko key over the key file path

aladinoss commented on a change in pull request #16338:
URL: https://github.com/apache/airflow/pull/16338#discussion_r688912522



##########
File path: airflow/providers/sftp/hooks/sftp.py
##########
@@ -135,8 +135,13 @@ def get_conn(self) -> pysftp.Connection:
             }
             if self.password and self.password.strip():
                 conn_params['password'] = self.password
-            if self.key_file:
+
+            # Try to use the paramiko key from the SSH hook

Review comment:
       @potiuk & @hx-markterry what is the benefit of using separate hook for sftp sensor and operator ?
   Is it better to change the documentation and use ssh hook for both sftp and ssh ?
   example:
   
   '''
   import logging
   from datetime import datetime
   from paramiko import SFTP_NO_SUCH_FILE
   from airflow.contrib.hooks.ssh_hook import SSHHook
   from airflow.operators.sensors import BaseSensorOperator
   from airflow.utils.decorators import apply_defaults
   from airflow.plugins_manager import AirflowPlugin
   
   log = logging.getLogger(__name__)
   
   
   class SgSFTPSensor(BaseSensorOperator):
       """
       Waits for a file or directory to be present on SFTP.
   
       :param path: Remote file or directory path
       :type path: str
       :param ssh_conn_id: The connection to run the sensor against
       :type shh_conn_id: str
       """
       template_fields = ('path',)
   
       @apply_defaults
       def __init__(self, path, shh_conn_id='ssh_default', *args, **kwargs):
           super(SgSFTPSensor, self).__init__(*args, **kwargs)
           self.path = path
           self.hook = None
           self.shh_conn_id = shh_conn_id
   
       def get_mod_time(self, path):
           ssh_hook = SSHHook(ssh_conn_id=self.shh_conn_id)
           conn = ssh_hook.get_conn().open_sftp()
           ftp_mdtm = conn.stat(path).st_mtime
           return datetime.fromtimestamp(ftp_mdtm).strftime('%Y%m%d%H%M%S')
   
       def poke(self, context):
           self.log.info('Poking for %s', self.path)
           try:
               self.get_mod_time(self.path)
           except IOError as e:
               log.info(SFTP_NO_SUCH_FILE)
               if e.errno != SFTP_NO_SUCH_FILE:
                   raise e
               log.info(f"{self.path} NOT FOUND, sensor will retry!")
               return False
           log.info(f"{self.path} FOUND, sensor will exit now.")
           return True
   
   
   class SgSFTPPlugin(AirflowPlugin):
       name = "sgsftpplugin"
       sensors = [
           SgSFTPSensor
       ]
   '''




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org