You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/04 13:57:05 UTC

[GitHub] [airflow] ashb opened a new pull request #21326: Refactor SSH tests to not use SSH server in operator tests

ashb opened a new pull request #21326:
URL: https://github.com/apache/airflow/pull/21326


   This required a slight refactor to the SSHOperator (moving
   `exec_ssh_client_command` "down" in to the Hook) but the SSH _Operator_
   tests now just use stubbing, and the only place that connects to a real
   SSH server is the one test of `test_exec_ssh_client_command` in SSHHook.
   
   This is both better structured, and hopefully produces less (or ideally
   no) random failures in our tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799644711



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -128,78 +131,32 @@ def get_hook(self) -> SSHHook:
 
         return self.ssh_hook
 
-    def get_ssh_client(self) -> SSHClient:
+    def get_ssh_client(self) -> "SSHClient":
         # Remember to use context manager or call .close() on this when done
         self.log.info('Creating ssh_client')
         return self.get_hook().get_conn()
 
-    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:
-        self.log.info("Running command: %s", command)
-
-        # set timeout taken as params
-        stdin, stdout, stderr = ssh_client.exec_command(
-            command=command,
-            get_pty=self.get_pty,
-            timeout=self.timeout,
-            environment=self.environment,
+    def exec_ssh_client_command(self, ssh_client: "SSHClient", command: str):
+        warnings.warn(
+            'exec_ssh_client_command method on SSHOperator is deprecated, call '
+            '`ssh_hook.exec_ssh_client_command` instead',
+            DeprecationWarning,
+        )
+        assert self.ssh_hook
+        return self.ssh_hook.exec_ssh_client_command(
+            ssh_client, command, timeout=self.timeout, environment=self.environment, get_pty=self.get_pty
         )
-        # get channels
-        channel = stdout.channel
-
-        # closing stdin
-        stdin.close()
-        channel.shutdown_write()
-
-        agg_stdout = b''
-        agg_stderr = b''
-
-        # capture any initial output in case channel is closed already
-        stdout_buffer_length = len(stdout.channel.in_buffer)
-
-        if stdout_buffer_length > 0:
-            agg_stdout += stdout.channel.recv(stdout_buffer_length)
-
-        # read from both stdout and stderr
-        while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
-            readq, _, _ = select([channel], [], [], self.cmd_timeout)
-            for recv in readq:
-                if recv.recv_ready():
-                    line = stdout.channel.recv(len(recv.in_buffer))
-                    agg_stdout += line
-                    self.log.info(line.decode('utf-8', 'replace').strip('\n'))
-                if recv.recv_stderr_ready():
-                    line = stderr.channel.recv_stderr(len(recv.in_stderr_buffer))
-                    agg_stderr += line
-                    self.log.warning(line.decode('utf-8', 'replace').strip('\n'))
-            if (
-                stdout.channel.exit_status_ready()
-                and not stderr.channel.recv_stderr_ready()
-                and not stdout.channel.recv_ready()
-            ):
-                stdout.channel.shutdown_read()
-                try:
-                    stdout.channel.close()
-                except Exception:
-                    # there is a race that when shutdown_read has been called and when
-                    # you try to close the connection, the socket is already closed
-                    # We should ignore such errors (but we should log them with warning)
-                    self.log.warning("Ignoring exception on close", exc_info=True)
-                break
-
-        stdout.close()
-        stderr.close()
-
-        exit_status = stdout.channel.recv_exit_status()
-
-        return exit_status, agg_stdout, agg_stderr
 
     def raise_for_status(self, exit_status: int, stderr: bytes) -> None:
         if exit_status != 0:
             error_msg = stderr.decode('utf-8')
             raise AirflowException(f"error running cmd: {self.command}, error: {error_msg}")
 
-    def run_ssh_client_command(self, ssh_client: SSHClient, command: str) -> bytes:
-        exit_status, agg_stdout, agg_stderr = self.exec_ssh_client_command(ssh_client, command)
+    def run_ssh_client_command(self, ssh_client: "SSHClient", command: str) -> bytes:
+        assert self.ssh_hook

Review comment:
       I am very cool with that :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799638037



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -128,78 +131,32 @@ def get_hook(self) -> SSHHook:
 
         return self.ssh_hook
 
-    def get_ssh_client(self) -> SSHClient:
+    def get_ssh_client(self) -> "SSHClient":
         # Remember to use context manager or call .close() on this when done
         self.log.info('Creating ssh_client')
         return self.get_hook().get_conn()
 
-    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:
-        self.log.info("Running command: %s", command)
-
-        # set timeout taken as params
-        stdin, stdout, stderr = ssh_client.exec_command(
-            command=command,
-            get_pty=self.get_pty,
-            timeout=self.timeout,
-            environment=self.environment,
+    def exec_ssh_client_command(self, ssh_client: "SSHClient", command: str):
+        warnings.warn(
+            'exec_ssh_client_command method on SSHOperator is deprecated, call '
+            '`ssh_hook.exec_ssh_client_command` instead',
+            DeprecationWarning,
+        )
+        assert self.ssh_hook
+        return self.ssh_hook.exec_ssh_client_command(
+            ssh_client, command, timeout=self.timeout, environment=self.environment, get_pty=self.get_pty
         )
-        # get channels
-        channel = stdout.channel
-
-        # closing stdin
-        stdin.close()
-        channel.shutdown_write()
-
-        agg_stdout = b''
-        agg_stderr = b''
-
-        # capture any initial output in case channel is closed already
-        stdout_buffer_length = len(stdout.channel.in_buffer)
-
-        if stdout_buffer_length > 0:
-            agg_stdout += stdout.channel.recv(stdout_buffer_length)
-
-        # read from both stdout and stderr
-        while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
-            readq, _, _ = select([channel], [], [], self.cmd_timeout)
-            for recv in readq:
-                if recv.recv_ready():
-                    line = stdout.channel.recv(len(recv.in_buffer))
-                    agg_stdout += line
-                    self.log.info(line.decode('utf-8', 'replace').strip('\n'))
-                if recv.recv_stderr_ready():
-                    line = stderr.channel.recv_stderr(len(recv.in_stderr_buffer))
-                    agg_stderr += line
-                    self.log.warning(line.decode('utf-8', 'replace').strip('\n'))
-            if (
-                stdout.channel.exit_status_ready()
-                and not stderr.channel.recv_stderr_ready()
-                and not stdout.channel.recv_ready()
-            ):
-                stdout.channel.shutdown_read()
-                try:
-                    stdout.channel.close()
-                except Exception:
-                    # there is a race that when shutdown_read has been called and when
-                    # you try to close the connection, the socket is already closed
-                    # We should ignore such errors (but we should log them with warning)
-                    self.log.warning("Ignoring exception on close", exc_info=True)
-                break
-
-        stdout.close()
-        stderr.close()
-
-        exit_status = stdout.channel.recv_exit_status()
-
-        return exit_status, agg_stdout, agg_stderr
 
     def raise_for_status(self, exit_status: int, stderr: bytes) -> None:
         if exit_status != 0:
             error_msg = stderr.decode('utf-8')
             raise AirflowException(f"error running cmd: {self.command}, error: {error_msg}")
 
-    def run_ssh_client_command(self, ssh_client: SSHClient, command: str) -> bytes:
-        exit_status, agg_stdout, agg_stderr = self.exec_ssh_client_command(ssh_client, command)
+    def run_ssh_client_command(self, ssh_client: "SSHClient", command: str) -> bytes:
+        assert self.ssh_hook

Review comment:
       are we cool with asserts now ? I am cool in this case BTW. 

##########
File path: airflow/providers/ssh/hooks/ssh.py
##########
@@ -428,3 +429,70 @@ def _pkey_from_private_key(self, private_key: str, passphrase: Optional[str] = N
             'Ensure key provided is valid for one of the following'
             'key formats: RSA, DSS, ECDSA, or Ed25519'
         )
+

Review comment:
       Wow. why was it in the operator? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799638939



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -18,15 +18,16 @@
 
 import warnings
 from base64 import b64encode
-from select import select
-from typing import Optional, Sequence, Tuple, Union
-
-from paramiko.client import SSHClient
+from typing import TYPE_CHECKING, Optional, Sequence, Union
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.ssh.hooks.ssh import SSHHook
+
+if TYPE_CHECKING:
+    from paramiko.client import SSHClient
+
+    from airflow.providers.ssh.hooks.ssh import SSHHook

Review comment:
       Yeah. My question too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#issuecomment-1030162580


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799640839



##########
File path: tests/providers/ssh/operators/test_ssh.py
##########
@@ -42,117 +42,69 @@ class SSHClientSideEffect:
     def __init__(self, hook):
         self.hook = hook
 
-    def __call__(self):
-        self.return_value = self.hook.get_conn()
-        return self.return_value
-
 
 class TestSSHOperator:
     def setup_method(self):
-        from airflow.providers.ssh.hooks.ssh import SSHHook
 
-        hook = SSHHook(ssh_conn_id='ssh_default', banner_timeout=100)
+        hook = SSHHook(ssh_conn_id='ssh_default')
         hook.no_host_key_check = True
-        self.dag = DAG('ssh_test', default_args={'start_date': DEFAULT_DATE})
+
+        ssh_client = mock.create_autospec(SSHClient)
+        # `with ssh_client` should return itself.
+        ssh_client.__enter__.return_value = ssh_client
+        hook.get_conn = mock.MagicMock(return_value=ssh_client)
         self.hook = hook
 
-    def test_hook_created_correctly_with_timeout(self):
-        timeout = 20
-        ssh_id = "ssh_default"
-        with self.dag:
-            task = SSHOperator(
-                task_id="test",
-                command=COMMAND,
-                timeout=timeout,
-                ssh_conn_id="ssh_default",
-                banner_timeout=100,
-            )
-        task.execute(None)
-        assert timeout == task.ssh_hook.conn_timeout
-        assert ssh_id == task.ssh_hook.ssh_conn_id
+    # Make sure nothing in this test actually connects to SSH -- that's for hook tests.
+    @pytest.fixture(autouse=True)
+    def _patch_exec_ssh_client(self):
+        with mock.patch.object(self.hook, 'exec_ssh_client_command') as exec_ssh_client_command:
+            self.exec_ssh_client_command = exec_ssh_client_command
+            exec_ssh_client_command.return_value = (0, b'airflow', '')
+            yield exec_ssh_client_command
 
     def test_hook_created_correctly(self):
         conn_timeout = 20
         cmd_timeout = 45
-        ssh_id = 'ssh_default'
-        with self.dag:
-            task = SSHOperator(
-                task_id="test",
-                command=COMMAND,
-                conn_timeout=conn_timeout,
-                cmd_timeout=cmd_timeout,
-                ssh_conn_id="ssh_default",
-                banner_timeout=100,
-            )
-        task.execute(None)
-        assert conn_timeout == task.ssh_hook.conn_timeout
-        assert ssh_id == task.ssh_hook.ssh_conn_id
-
-    @conf_vars({('core', 'enable_xcom_pickling'): 'False'})
-    def test_json_command_execution(self, create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            SSHOperator,
-            dag_id="unit_tests_ssh_test_op_json_command_execution",
-            task_id="test",
-            ssh_hook=self.hook,
-            command=COMMAND,
-            do_xcom_push=True,
-            banner_timeout=100,
-        )
-        ti.run()
-        assert ti.duration is not None
-        assert ti.xcom_pull(task_ids='test', key='return_value') == b64encode(b'airflow').decode('utf-8')
-
-    @conf_vars({('core', 'enable_xcom_pickling'): 'True'})
-    def test_pickle_command_execution(self, create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            SSHOperator,
-            dag_id="unit_tests_ssh_test_op_pickle_command_execution",
+        task = SSHOperator(
             task_id="test",
-            ssh_hook=self.hook,
             command=COMMAND,
-            do_xcom_push=True,
-            banner_timeout=100,
+            conn_timeout=conn_timeout,
+            cmd_timeout=cmd_timeout,
+            ssh_conn_id="ssh_default",
         )
-        ti.run()
-        assert ti.duration is not None
-        assert ti.xcom_pull(task_ids='test', key='return_value') == b'airflow'
+        ssh_hook = task.get_hook()
+        assert conn_timeout == ssh_hook.conn_timeout
+        assert "ssh_default" == ssh_hook.ssh_conn_id
 
-    @conf_vars({('core', 'enable_xcom_pickling'): 'True'})
-    def test_command_execution_with_env(self, create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            SSHOperator,
-            dag_id="unit_tests_ssh_test_op_command_execution_with_env",
+    @conf_vars({('core', 'enable_xcom_pickling'): 'False'})

Review comment:
       Probably -- I think I just "copied" it across without paying attention to the default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #21326:
URL: https://github.com/apache/airflow/pull/21326


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799659350



##########
File path: tests/providers/ssh/hooks/test_ssh.py
##########
@@ -739,6 +740,21 @@ def test_openssh_private_key(self):
             session.delete(conn)
             session.commit()
 
+    def test_exec_ssh_client_command(self):
+        hook = SSHHook(
+            ssh_conn_id='ssh_default',
+            conn_timeout=30,
+            banner_timeout=100,
+        )
 
-if __name__ == '__main__':
-    unittest.main()
+        for attempt in tenacity.Retrying(stop=tenacity.stop_after_attempt(5)):

Review comment:
       I don't think so? If it doesn't succeed doesn't it raise the error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799640218



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -128,78 +131,32 @@ def get_hook(self) -> SSHHook:
 
         return self.ssh_hook
 
-    def get_ssh_client(self) -> SSHClient:
+    def get_ssh_client(self) -> "SSHClient":
         # Remember to use context manager or call .close() on this when done
         self.log.info('Creating ssh_client')
         return self.get_hook().get_conn()
 
-    def exec_ssh_client_command(self, ssh_client: SSHClient, command: str) -> Tuple[int, bytes, bytes]:
-        self.log.info("Running command: %s", command)
-
-        # set timeout taken as params
-        stdin, stdout, stderr = ssh_client.exec_command(
-            command=command,
-            get_pty=self.get_pty,
-            timeout=self.timeout,
-            environment=self.environment,
+    def exec_ssh_client_command(self, ssh_client: "SSHClient", command: str):
+        warnings.warn(
+            'exec_ssh_client_command method on SSHOperator is deprecated, call '
+            '`ssh_hook.exec_ssh_client_command` instead',
+            DeprecationWarning,
+        )
+        assert self.ssh_hook
+        return self.ssh_hook.exec_ssh_client_command(
+            ssh_client, command, timeout=self.timeout, environment=self.environment, get_pty=self.get_pty
         )
-        # get channels
-        channel = stdout.channel
-
-        # closing stdin
-        stdin.close()
-        channel.shutdown_write()
-
-        agg_stdout = b''
-        agg_stderr = b''
-
-        # capture any initial output in case channel is closed already
-        stdout_buffer_length = len(stdout.channel.in_buffer)
-
-        if stdout_buffer_length > 0:
-            agg_stdout += stdout.channel.recv(stdout_buffer_length)
-
-        # read from both stdout and stderr
-        while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
-            readq, _, _ = select([channel], [], [], self.cmd_timeout)
-            for recv in readq:
-                if recv.recv_ready():
-                    line = stdout.channel.recv(len(recv.in_buffer))
-                    agg_stdout += line
-                    self.log.info(line.decode('utf-8', 'replace').strip('\n'))
-                if recv.recv_stderr_ready():
-                    line = stderr.channel.recv_stderr(len(recv.in_stderr_buffer))
-                    agg_stderr += line
-                    self.log.warning(line.decode('utf-8', 'replace').strip('\n'))
-            if (
-                stdout.channel.exit_status_ready()
-                and not stderr.channel.recv_stderr_ready()
-                and not stdout.channel.recv_ready()
-            ):
-                stdout.channel.shutdown_read()
-                try:
-                    stdout.channel.close()
-                except Exception:
-                    # there is a race that when shutdown_read has been called and when
-                    # you try to close the connection, the socket is already closed
-                    # We should ignore such errors (but we should log them with warning)
-                    self.log.warning("Ignoring exception on close", exc_info=True)
-                break
-
-        stdout.close()
-        stderr.close()
-
-        exit_status = stdout.channel.recv_exit_status()
-
-        return exit_status, agg_stdout, agg_stderr
 
     def raise_for_status(self, exit_status: int, stderr: bytes) -> None:
         if exit_status != 0:
             error_msg = stderr.decode('utf-8')
             raise AirflowException(f"error running cmd: {self.command}, error: {error_msg}")
 
-    def run_ssh_client_command(self, ssh_client: SSHClient, command: str) -> bytes:
-        exit_status, agg_stdout, agg_stderr = self.exec_ssh_client_command(ssh_client, command)
+    def run_ssh_client_command(self, ssh_client: "SSHClient", command: str) -> bytes:
+        assert self.ssh_hook

Review comment:
       I took the case of "this is just for typing, to protect against none, and has no harm if it doesn't exist"
   
   I'll update the guidance (possibly to say "don't use asserts for anything that wouldn't fail in other ways"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799642662



##########
File path: tests/providers/ssh/operators/test_ssh.py
##########
@@ -42,117 +42,69 @@ class SSHClientSideEffect:
     def __init__(self, hook):
         self.hook = hook
 
-    def __call__(self):
-        self.return_value = self.hook.get_conn()
-        return self.return_value
-
 
 class TestSSHOperator:
     def setup_method(self):
-        from airflow.providers.ssh.hooks.ssh import SSHHook
 
-        hook = SSHHook(ssh_conn_id='ssh_default', banner_timeout=100)
+        hook = SSHHook(ssh_conn_id='ssh_default')
         hook.no_host_key_check = True
-        self.dag = DAG('ssh_test', default_args={'start_date': DEFAULT_DATE})
+
+        ssh_client = mock.create_autospec(SSHClient)
+        # `with ssh_client` should return itself.
+        ssh_client.__enter__.return_value = ssh_client
+        hook.get_conn = mock.MagicMock(return_value=ssh_client)
         self.hook = hook
 
-    def test_hook_created_correctly_with_timeout(self):
-        timeout = 20
-        ssh_id = "ssh_default"
-        with self.dag:
-            task = SSHOperator(
-                task_id="test",
-                command=COMMAND,
-                timeout=timeout,
-                ssh_conn_id="ssh_default",
-                banner_timeout=100,
-            )
-        task.execute(None)
-        assert timeout == task.ssh_hook.conn_timeout
-        assert ssh_id == task.ssh_hook.ssh_conn_id
+    # Make sure nothing in this test actually connects to SSH -- that's for hook tests.
+    @pytest.fixture(autouse=True)
+    def _patch_exec_ssh_client(self):
+        with mock.patch.object(self.hook, 'exec_ssh_client_command') as exec_ssh_client_command:
+            self.exec_ssh_client_command = exec_ssh_client_command
+            exec_ssh_client_command.return_value = (0, b'airflow', '')
+            yield exec_ssh_client_command
 
     def test_hook_created_correctly(self):
         conn_timeout = 20
         cmd_timeout = 45
-        ssh_id = 'ssh_default'
-        with self.dag:
-            task = SSHOperator(
-                task_id="test",
-                command=COMMAND,
-                conn_timeout=conn_timeout,
-                cmd_timeout=cmd_timeout,
-                ssh_conn_id="ssh_default",
-                banner_timeout=100,
-            )
-        task.execute(None)
-        assert conn_timeout == task.ssh_hook.conn_timeout
-        assert ssh_id == task.ssh_hook.ssh_conn_id
-
-    @conf_vars({('core', 'enable_xcom_pickling'): 'False'})
-    def test_json_command_execution(self, create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            SSHOperator,
-            dag_id="unit_tests_ssh_test_op_json_command_execution",
-            task_id="test",
-            ssh_hook=self.hook,
-            command=COMMAND,
-            do_xcom_push=True,
-            banner_timeout=100,
-        )
-        ti.run()
-        assert ti.duration is not None
-        assert ti.xcom_pull(task_ids='test', key='return_value') == b64encode(b'airflow').decode('utf-8')
-
-    @conf_vars({('core', 'enable_xcom_pickling'): 'True'})
-    def test_pickle_command_execution(self, create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            SSHOperator,
-            dag_id="unit_tests_ssh_test_op_pickle_command_execution",
+        task = SSHOperator(
             task_id="test",
-            ssh_hook=self.hook,
             command=COMMAND,
-            do_xcom_push=True,
-            banner_timeout=100,
+            conn_timeout=conn_timeout,
+            cmd_timeout=cmd_timeout,
+            ssh_conn_id="ssh_default",
         )
-        ti.run()
-        assert ti.duration is not None
-        assert ti.xcom_pull(task_ids='test', key='return_value') == b'airflow'
+        ssh_hook = task.get_hook()
+        assert conn_timeout == ssh_hook.conn_timeout
+        assert "ssh_default" == ssh_hook.ssh_conn_id
 
-    @conf_vars({('core', 'enable_xcom_pickling'): 'True'})
-    def test_command_execution_with_env(self, create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            SSHOperator,
-            dag_id="unit_tests_ssh_test_op_command_execution_with_env",
+    @conf_vars({('core', 'enable_xcom_pickling'): 'False'})

Review comment:
       Oh yeah, bad refactor entirely!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799529198



##########
File path: tests/providers/ssh/operators/test_ssh.py
##########
@@ -204,104 +141,51 @@ def test_arg_checking(self):
             ssh_hook=self.hook,
             ssh_conn_id=TEST_CONN_ID,
             command=COMMAND,
-            timeout=TIMEOUT,
-            dag=self.dag,
             remote_host='operator_remote_host',
-            banner_timeout=100,
         )
-        try:
-            task_4.execute(None)
-        except Exception:
-            pass
+        task_4.execute(None)
         assert task_4.ssh_hook.ssh_conn_id == self.hook.ssh_conn_id
         assert task_4.ssh_hook.remote_host == 'operator_remote_host'
 
+        with pytest.raises(
+            AirflowException, match="SSH operator error: SSH command not specified. Aborting."
+        ):
+            SSHOperator(
+                task_id="test_4",
+                ssh_hook=self.hook,
+                command=None,
+            ).execute(None)
+            task_0.execute(None)
+
     @pytest.mark.parametrize(
         "command, get_pty_in, get_pty_out",
         [
             (COMMAND, False, False),
             (COMMAND, True, True),
             (COMMAND_WITH_SUDO, False, True),
             (COMMAND_WITH_SUDO, True, True),
-            (None, True, True),
         ],
     )
     def test_get_pyt_set_correctly(self, command, get_pty_in, get_pty_out):
         task = SSHOperator(
             task_id="test",
             ssh_hook=self.hook,
             command=command,
-            conn_timeout=TIMEOUT,
-            cmd_timeout=TIMEOUT,
             get_pty=get_pty_in,
-            dag=self.dag,
-            banner_timeout=100,
         )
-        if command is None:
-            with pytest.raises(AirflowException) as ctx:
-                task.execute(None)
-            assert str(ctx.value) == "SSH operator error: SSH command not specified. Aborting."
-        else:
-            task.execute(None)
-            assert task.get_pty == get_pty_out
+        task.execute(None)
+        assert task.get_pty == get_pty_out
 
     def test_ssh_client_managed_correctly(self):
-        # Ensure ssh_client gets created once
-        # Ensure connection gets closed once
+        # Ensure connection gets closed once (via context_manager)
         task = SSHOperator(
             task_id="test",
             ssh_hook=self.hook,
             command="ls",
-            dag=self.dag,
-            banner_timeout=100,
         )
-
-        se = SSHClientSideEffect(self.hook)
-        with unittest.mock.patch.object(task, 'get_ssh_client') as mock_get, unittest.mock.patch(
-            'paramiko.client.SSHClient.close'
-        ) as mock_close:
-            mock_get.side_effect = se
-            task.execute()
-            mock_get.assert_called_once()
-            mock_close.assert_called_once()
-
-    def test_one_ssh_client_many_commands(self):

Review comment:
       I deleted this test entirely -- it was only testing code that lived in the test file, or was already tested, so has no value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799639484



##########
File path: airflow/providers/ssh/hooks/ssh.py
##########
@@ -428,3 +429,70 @@ def _pkey_from_private_key(self, private_key: str, passphrase: Optional[str] = N
             'Ensure key provided is valid for one of the following'
             'key formats: RSA, DSS, ECDSA, or Ed25519'
         )
+

Review comment:
       Very good question.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799529198



##########
File path: tests/providers/ssh/operators/test_ssh.py
##########
@@ -204,104 +141,51 @@ def test_arg_checking(self):
             ssh_hook=self.hook,
             ssh_conn_id=TEST_CONN_ID,
             command=COMMAND,
-            timeout=TIMEOUT,
-            dag=self.dag,
             remote_host='operator_remote_host',
-            banner_timeout=100,
         )
-        try:
-            task_4.execute(None)
-        except Exception:
-            pass
+        task_4.execute(None)
         assert task_4.ssh_hook.ssh_conn_id == self.hook.ssh_conn_id
         assert task_4.ssh_hook.remote_host == 'operator_remote_host'
 
+        with pytest.raises(
+            AirflowException, match="SSH operator error: SSH command not specified. Aborting."
+        ):
+            SSHOperator(
+                task_id="test_4",
+                ssh_hook=self.hook,
+                command=None,
+            ).execute(None)
+            task_0.execute(None)
+
     @pytest.mark.parametrize(
         "command, get_pty_in, get_pty_out",
         [
             (COMMAND, False, False),
             (COMMAND, True, True),
             (COMMAND_WITH_SUDO, False, True),
             (COMMAND_WITH_SUDO, True, True),
-            (None, True, True),
         ],
     )
     def test_get_pyt_set_correctly(self, command, get_pty_in, get_pty_out):
         task = SSHOperator(
             task_id="test",
             ssh_hook=self.hook,
             command=command,
-            conn_timeout=TIMEOUT,
-            cmd_timeout=TIMEOUT,
             get_pty=get_pty_in,
-            dag=self.dag,
-            banner_timeout=100,
         )
-        if command is None:
-            with pytest.raises(AirflowException) as ctx:
-                task.execute(None)
-            assert str(ctx.value) == "SSH operator error: SSH command not specified. Aborting."
-        else:
-            task.execute(None)
-            assert task.get_pty == get_pty_out
+        task.execute(None)
+        assert task.get_pty == get_pty_out
 
     def test_ssh_client_managed_correctly(self):
-        # Ensure ssh_client gets created once
-        # Ensure connection gets closed once
+        # Ensure connection gets closed once (via context_manager)
         task = SSHOperator(
             task_id="test",
             ssh_hook=self.hook,
             command="ls",
-            dag=self.dag,
-            banner_timeout=100,
         )
-
-        se = SSHClientSideEffect(self.hook)
-        with unittest.mock.patch.object(task, 'get_ssh_client') as mock_get, unittest.mock.patch(
-            'paramiko.client.SSHClient.close'
-        ) as mock_close:
-            mock_get.side_effect = se
-            task.execute()
-            mock_get.assert_called_once()
-            mock_close.assert_called_once()
-
-    def test_one_ssh_client_many_commands(self):

Review comment:
       I deleted this test entirely -- it was only testing code that lived in the operator, or was already tested, so has no value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799647483



##########
File path: tests/providers/ssh/hooks/test_ssh.py
##########
@@ -739,6 +740,21 @@ def test_openssh_private_key(self):
             session.delete(conn)
             session.commit()
 
+    def test_exec_ssh_client_command(self):
+        hook = SSHHook(
+            ssh_conn_id='ssh_default',
+            conn_timeout=30,
+            banner_timeout=100,
+        )
 
-if __name__ == '__main__':
-    unittest.main()
+        for attempt in tenacity.Retrying(stop=tenacity.stop_after_attempt(5)):

Review comment:
       could this always fail silently and do we care about that?

##########
File path: tests/providers/ssh/hooks/test_ssh.py
##########
@@ -739,6 +740,21 @@ def test_openssh_private_key(self):
             session.delete(conn)
             session.commit()
 
+    def test_exec_ssh_client_command(self):
+        hook = SSHHook(
+            ssh_conn_id='ssh_default',
+            conn_timeout=30,
+            banner_timeout=100,
+        )
 
-if __name__ == '__main__':
-    unittest.main()
+        for attempt in tenacity.Retrying(stop=tenacity.stop_after_attempt(5)):

Review comment:
       could this always fail silently and if so do we care about that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799635055



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -18,15 +18,16 @@
 
 import warnings
 from base64 import b64encode
-from select import select
-from typing import Optional, Sequence, Tuple, Union
-
-from paramiko.client import SSHClient
+from typing import TYPE_CHECKING, Optional, Sequence, Union
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.ssh.hooks.ssh import SSHHook
+
+if TYPE_CHECKING:
+    from paramiko.client import SSHClient
+
+    from airflow.providers.ssh.hooks.ssh import SSHHook

Review comment:
       For my own edification, why only import this when type checking?

##########
File path: tests/providers/ssh/operators/test_ssh.py
##########
@@ -42,117 +42,69 @@ class SSHClientSideEffect:
     def __init__(self, hook):
         self.hook = hook
 
-    def __call__(self):
-        self.return_value = self.hook.get_conn()
-        return self.return_value
-
 
 class TestSSHOperator:
     def setup_method(self):
-        from airflow.providers.ssh.hooks.ssh import SSHHook
 
-        hook = SSHHook(ssh_conn_id='ssh_default', banner_timeout=100)
+        hook = SSHHook(ssh_conn_id='ssh_default')
         hook.no_host_key_check = True
-        self.dag = DAG('ssh_test', default_args={'start_date': DEFAULT_DATE})
+
+        ssh_client = mock.create_autospec(SSHClient)
+        # `with ssh_client` should return itself.
+        ssh_client.__enter__.return_value = ssh_client
+        hook.get_conn = mock.MagicMock(return_value=ssh_client)
         self.hook = hook
 
-    def test_hook_created_correctly_with_timeout(self):
-        timeout = 20
-        ssh_id = "ssh_default"
-        with self.dag:
-            task = SSHOperator(
-                task_id="test",
-                command=COMMAND,
-                timeout=timeout,
-                ssh_conn_id="ssh_default",
-                banner_timeout=100,
-            )
-        task.execute(None)
-        assert timeout == task.ssh_hook.conn_timeout
-        assert ssh_id == task.ssh_hook.ssh_conn_id
+    # Make sure nothing in this test actually connects to SSH -- that's for hook tests.
+    @pytest.fixture(autouse=True)
+    def _patch_exec_ssh_client(self):
+        with mock.patch.object(self.hook, 'exec_ssh_client_command') as exec_ssh_client_command:
+            self.exec_ssh_client_command = exec_ssh_client_command
+            exec_ssh_client_command.return_value = (0, b'airflow', '')
+            yield exec_ssh_client_command
 
     def test_hook_created_correctly(self):
         conn_timeout = 20
         cmd_timeout = 45
-        ssh_id = 'ssh_default'
-        with self.dag:
-            task = SSHOperator(
-                task_id="test",
-                command=COMMAND,
-                conn_timeout=conn_timeout,
-                cmd_timeout=cmd_timeout,
-                ssh_conn_id="ssh_default",
-                banner_timeout=100,
-            )
-        task.execute(None)
-        assert conn_timeout == task.ssh_hook.conn_timeout
-        assert ssh_id == task.ssh_hook.ssh_conn_id
-
-    @conf_vars({('core', 'enable_xcom_pickling'): 'False'})
-    def test_json_command_execution(self, create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            SSHOperator,
-            dag_id="unit_tests_ssh_test_op_json_command_execution",
-            task_id="test",
-            ssh_hook=self.hook,
-            command=COMMAND,
-            do_xcom_push=True,
-            banner_timeout=100,
-        )
-        ti.run()
-        assert ti.duration is not None
-        assert ti.xcom_pull(task_ids='test', key='return_value') == b64encode(b'airflow').decode('utf-8')
-
-    @conf_vars({('core', 'enable_xcom_pickling'): 'True'})
-    def test_pickle_command_execution(self, create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            SSHOperator,
-            dag_id="unit_tests_ssh_test_op_pickle_command_execution",
+        task = SSHOperator(
             task_id="test",
-            ssh_hook=self.hook,
             command=COMMAND,
-            do_xcom_push=True,
-            banner_timeout=100,
+            conn_timeout=conn_timeout,
+            cmd_timeout=cmd_timeout,
+            ssh_conn_id="ssh_default",
         )
-        ti.run()
-        assert ti.duration is not None
-        assert ti.xcom_pull(task_ids='test', key='return_value') == b'airflow'
+        ssh_hook = task.get_hook()
+        assert conn_timeout == ssh_hook.conn_timeout
+        assert "ssh_default" == ssh_hook.ssh_conn_id
 
-    @conf_vars({('core', 'enable_xcom_pickling'): 'True'})
-    def test_command_execution_with_env(self, create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            SSHOperator,
-            dag_id="unit_tests_ssh_test_op_command_execution_with_env",
+    @conf_vars({('core', 'enable_xcom_pickling'): 'False'})

Review comment:
       This is false by default, no? Plus we also set it in the test body?
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799639210



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -18,15 +18,16 @@
 
 import warnings
 from base64 import b64encode
-from select import select
-from typing import Optional, Sequence, Tuple, Union
-
-from paramiko.client import SSHClient
+from typing import TYPE_CHECKING, Optional, Sequence, Union
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.ssh.hooks.ssh import SSHHook
+
+if TYPE_CHECKING:
+    from paramiko.client import SSHClient
+
+    from airflow.providers.ssh.hooks.ssh import SSHHook

Review comment:
       Not strictly required, it just means DAG parsing has to import less.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799646075



##########
File path: airflow/providers/ssh/operators/ssh.py
##########
@@ -18,15 +18,16 @@
 
 import warnings
 from base64 import b64encode
-from select import select
-from typing import Optional, Sequence, Tuple, Union
-
-from paramiko.client import SSHClient
+from typing import TYPE_CHECKING, Optional, Sequence, Union
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.ssh.hooks.ssh import SSHHook
+
+if TYPE_CHECKING:
+    from paramiko.client import SSHClient
+
+    from airflow.providers.ssh.hooks.ssh import SSHHook

Review comment:
       Hmm. Sounds like new patern in the making. Some entirely discussion to make, but this touches some interesting possibilities. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21326: Refactor SSH tests to not use SSH server in operator tests

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21326:
URL: https://github.com/apache/airflow/pull/21326#discussion_r799643262



##########
File path: tests/providers/ssh/hooks/test_ssh.py
##########
@@ -739,6 +740,21 @@ def test_openssh_private_key(self):
             session.delete(conn)
             session.commit()
 
+    def test_exec_ssh_client_command(self):
+        hook = SSHHook(
+            ssh_conn_id='ssh_default',
+            conn_timeout=30,
+            banner_timeout=100,
+        )
 
-if __name__ == '__main__':
-    unittest.main()
+        for attempt in tenacity.Retrying(stop=tenacity.stop_after_attempt(5)):

Review comment:
       This line (and isolating real SSH to this one function) Is most of the test fix, the rest is just refactoring code to make this a function on the hook, not the operator..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org