You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@libcloud.apache.org by to...@apache.org on 2015/08/03 11:16:15 UTC

[2/4] libcloud git commit: Consume stdout, stderr streams on exit_status_ready

Consume stdout, stderr streams on exit_status_ready

* In some cases such as executing commands on localhost
via SSH, exit_status_ready gets set super fast. We don't
seem to consume them if this happens. The problem
manifest when you wrap the ParamikoSSHClient inside
something like an evenlet pool. This PR fixes that issue.

Closes #558

Signed-off-by: Tomaz Muraus <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/libcloud/repo
Commit: http://git-wip-us.apache.org/repos/asf/libcloud/commit/c90d63d6
Tree: http://git-wip-us.apache.org/repos/asf/libcloud/tree/c90d63d6
Diff: http://git-wip-us.apache.org/repos/asf/libcloud/diff/c90d63d6

Branch: refs/heads/trunk
Commit: c90d63d6e0d51479b27ed79de56e96b6f3e3fb64
Parents: e689332
Author: Lakshmi Kannan <la...@lakshmikannan.me>
Authored: Sun Aug 2 14:54:09 2015 -0700
Committer: Tomaz Muraus <to...@apache.org>
Committed: Mon Aug 3 10:55:23 2015 +0200

----------------------------------------------------------------------
 libcloud/compute/ssh.py                  | 69 ++++++++++++++++++---------
 libcloud/test/compute/test_ssh_client.py |  6 ++-
 2 files changed, 51 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/libcloud/blob/c90d63d6/libcloud/compute/ssh.py
----------------------------------------------------------------------
diff --git a/libcloud/compute/ssh.py b/libcloud/compute/ssh.py
index 427e133..12b6f8e 100644
--- a/libcloud/compute/ssh.py
+++ b/libcloud/compute/ssh.py
@@ -352,6 +352,10 @@ class ParamikoSSHClient(BaseSSHClient):
         # which is not ready will block for indefinitely.
         exit_status_ready = chan.exit_status_ready()
 
+        if exit_status_ready:
+            stdout.write(self._consume_stdout(chan).getvalue())
+            stderr.write(self._consume_stderr(chan).getvalue())
+
         while not exit_status_ready:
             current_time = time.time()
             elapsed_time = (current_time - start_time)
@@ -362,29 +366,8 @@ class ParamikoSSHClient(BaseSSHClient):
 
                 raise SSHCommandTimeoutError(cmd=cmd, timeout=timeout)
 
-            if chan.recv_ready():
-                data = chan.recv(self.CHUNK_SIZE)
-
-                while data:
-                    stdout.write(b(data).decode('utf-8'))
-                    ready = chan.recv_ready()
-
-                    if not ready:
-                        break
-
-                    data = chan.recv(self.CHUNK_SIZE)
-
-            if chan.recv_stderr_ready():
-                data = chan.recv_stderr(self.CHUNK_SIZE)
-
-                while data:
-                    stderr.write(b(data).decode('utf-8'))
-                    ready = chan.recv_stderr_ready()
-
-                    if not ready:
-                        break
-
-                    data = chan.recv_stderr(self.CHUNK_SIZE)
+            stdout.write(self._consume_stdout(chan).getvalue())
+            stderr.write(self._consume_stderr(chan).getvalue())
 
             # We need to check the exist status here, because the command could
             # print some output and exit during this sleep bellow.
@@ -413,6 +396,46 @@ class ParamikoSSHClient(BaseSSHClient):
         self.client.close()
         return True
 
+    def _consume_stdout(self, chan):
+        """
+        Try to consume stdout data from chan if it's receive ready.
+        """
+
+        stdout = StringIO()
+        if chan.recv_ready():
+            data = chan.recv(self.CHUNK_SIZE)
+
+            while data:
+                stdout.write(b(data).decode('utf-8'))
+                ready = chan.recv_ready()
+
+                if not ready:
+                    break
+
+                data = chan.recv(self.CHUNK_SIZE)
+
+        return stdout
+
+    def _consume_stderr(self, chan):
+        """
+        Try to consume stderr data from chan if it's receive ready.
+        """
+
+        stderr = StringIO()
+        if chan.recv_stderr_ready():
+            data = chan.recv_stderr(self.CHUNK_SIZE)
+
+            while data:
+                stderr.write(b(data).decode('utf-8'))
+                ready = chan.recv_stderr_ready()
+
+                if not ready:
+                    break
+
+                data = chan.recv_stderr(self.CHUNK_SIZE)
+
+        return stderr
+
     def _get_pkey_object(self, key):
         """
         Try to detect private key type and return paramiko.PKey object.

http://git-wip-us.apache.org/repos/asf/libcloud/blob/c90d63d6/libcloud/test/compute/test_ssh_client.py
----------------------------------------------------------------------
diff --git a/libcloud/test/compute/test_ssh_client.py b/libcloud/test/compute/test_ssh_client.py
index 857f4e2..772e175 100644
--- a/libcloud/test/compute/test_ssh_client.py
+++ b/libcloud/test/compute/test_ssh_client.py
@@ -30,7 +30,7 @@ from libcloud.compute.ssh import have_paramiko
 
 from libcloud.utils.py3 import StringIO
 
-from mock import patch, Mock
+from mock import patch, Mock, MagicMock
 
 if not have_paramiko:
     ParamikoSSHClient = None  # NOQA
@@ -192,6 +192,10 @@ class ParamikoSSHClientTests(LibcloudTestCase):
                          'port': 22}
         mock.client.connect.assert_called_once_with(**expected_conn)
 
+    @patch.object(ParamikoSSHClient, '_consume_stdout',
+                  MagicMock(return_value=StringIO('')))
+    @patch.object(ParamikoSSHClient, '_consume_stderr',
+                  MagicMock(return_value=StringIO('')))
     def test_basic_usage_absolute_path(self):
         """
         Basic execution.