You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/02 12:39:52 UTC

[GitHub] [airflow] baolsen opened a new pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

baolsen opened a new pull request #17378:
URL: https://github.com/apache/airflow/pull/17378


   Created to aid discussion on #10874 , details are there.
   
   closes: #10874
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baolsen commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
baolsen commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r699137347



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       Hey @potiuk, thanks for the feedback.
   Hey @uranusjr, please review again and let me know :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#issuecomment-940577086


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r686661494



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       This feels like a wrong abstraction for me since `self` here is only used for logging, and it’s entirely up to the caller to pass in the correct SSHClient instance, which the operator should be able to manage.
   
   Would something like this make more sense?
   
   ```python
   @property
   def client(self):
       if self._client is None:
           raise RuntimeError("Outside of a create_ssh_client() context")
       return self._client
   
   def execute(self, context=None) -> Union[bytes, str]:
       with self.create_ssh_client():  # This sets self._client so it can be used by other methods.
           self.run_remote_command(command)
           # On exit, close self._client and set self._client to None.
       # Error handling and serialization etc. afterward omitted for brevity.
   ```
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baolsen commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
baolsen commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r689560435



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       Thanks for the feedback @uranusjr 
   
   I've pushed another commit along these lines, please take a look. 
   (I know this build is failing, can ignore & I'll work on it)
   
   The thing is we don't want to call `super.execute()` from a subclass. 
   So I put the error handling etc. outside it so it can be re-used by a subclass when needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baolsen commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
baolsen commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r691183619



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       Build has passed. Was flaky CI before now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r686661494



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       This feels like a wraon abstraction for me since `self` here is only used for logging, and it’s entirely up to the caller to pass in the correct SSHClient instance, which the operator should be able to manage.
   
   Would something like this make more sense?
   
   ```python
   @property
   def client(self):
       if self._client is None:
           raise RuntimeError("Outside of a create_ssh_client() context")
       return self._client
   
   def execute(self, context=None) -> Union[bytes, str]:
       with self.create_ssh_client():  # This sets self._client so it can be used by other methods.
           self.run_remote_command(command)
           # On exit, close self._client and set self._client to None.
       # Error handling and serialization etc. afterward omitted for brevity.
   ```
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baolsen commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
baolsen commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r699137347



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       Hey @potiuk, thanks for the feedback.
   Hey @uranusjr, please review again and let me know :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r691634877



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       I am ok to keep it as is (re: abstraction) - with passing client, we already have some hooks that do that stateless approach (and some that keep state of the connection in). 
   
   No strong opinions which is better. The statefull approach is better from OO perspective and gives more meaning to Hook as also being 'session'. But this is not necessary really. Hook (and it is a bad name) is more of a "nice API" for operator to (re-)use and to understand "connection" and read credentials from it.
   
   I think we never agreed on whether Hook should be 1<->1 session/client and maybe it does not really matter. I think the most important capability of the Hook is ability of mapping connection into credentials and simple Python API so that you can easily use it from Operator.
   
   But adding _client  as a field is also OK for me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#issuecomment-934631610


   I will take a closer look tomorrow - could you please rebase though? There some changes in ssh hook since the last round


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baolsen commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
baolsen commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r691183619



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       Build has passed. Was flaky CI before now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baolsen commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
baolsen commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r699137347



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       Hey @potiuk, thanks for the feedback.
   Hey @uranusjr, please review again and let me know :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baolsen commented on a change in pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
baolsen commented on a change in pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#discussion_r721932546



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -80,103 +82,120 @@ def __init__(
         self.environment = environment
         self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty
 
-    def execute(self, context) -> Union[bytes, str, bool]:
+    def get_hook(self) -> SSHHook:
+        if self.ssh_conn_id:
+            if self.ssh_hook and isinstance(self.ssh_hook, SSHHook):
+                self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")
+            else:
+                self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")
+                self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
+        if not self.ssh_hook:
+            raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
+
+        if self.remote_host is not None:
+            self.log.info(
+                "remote_host is provided explicitly. "
+                "It will replace the remote_host which was defined "
+                "in ssh_hook or predefined in connection of ssh_conn_id."
+            )
+            self.ssh_hook.remote_host = self.remote_host
+
+        return self.ssh_hook
+
+    def get_ssh_client(self) -> SSHClient:
+        # Remember to call close_ssh_client on this when done!
+        self.log.info('Creating ssh_client')
+        return self.get_hook().get_conn()
+
+    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:

Review comment:
       Hey @potiuk and @uranusjr , any feedback :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #17378:
URL: https://github.com/apache/airflow/pull/17378


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baolsen commented on pull request #17378: Refactor SSHOperator so a subclass can run many commands (#10874)

Posted by GitBox <gi...@apache.org>.
baolsen commented on pull request #17378:
URL: https://github.com/apache/airflow/pull/17378#issuecomment-896524343


   Hey @potiuk , I saw you made a recent change to SSH Operator. I've rebased and included it.
   Was wondering if you'd like to review this change also, or know others who may be interested :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org