You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/09/11 09:20:35 UTC

[zeppelin] branch master updated: [ZEPPELIN-4090] Ipython queue performance

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b1a394  [ZEPPELIN-4090] Ipython queue performance
0b1a394 is described below

commit 0b1a39490b799f0649a4c44d74e540f8c8dca2e4
Author: marc hurabielle <ma...@gmail.com>
AuthorDate: Wed Sep 11 14:39:43 2019 +0900

    [ZEPPELIN-4090] Ipython queue performance
    
    ### What is this PR for?
    
    The pr is to fix a bug that will make the **ipython** queue listener, overuse cpu. After this fix, cpu usage should be way lower.
    Also there is a bit of refactor to use only one queue to ensure message will be order even with a sleep.
    Also, the refactor is reducing the number of line and code duplication
    
    ### What type of PR is it?
    Bug Fix / performance improvement
    
    ### Todos
    * [x] - Performance improvement
    
    ### What is the Jira issue?
    It is one part of the jira issue.
    https://issues.apache.org/jira/browse/ZEPPELIN-4090
    
    ### How should this be tested?
    * First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed behavior
    * Outline any manual steps to test the PR here.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: marc hurabielle <ma...@gmail.com>
    
    Closes #3337 from AyWa/fix/ipython-cpu and squashes the following commits:
    
    1abe50f25 [marc hurabielle] fix error message when kernel stop
    30ef7f7cb [marc hurabielle] fix identation
    fe20923cf [marc hurabielle] fix rebase and fix lint
    e1d37b76e [marc hurabielle] increase queue size
    42f3ca097 [marc hurabielle] use one queue and use sleep to improve ipython performance
---
 .../main/resources/grpc/python/ipython_server.py   | 132 ++++++++-------------
 1 file changed, 49 insertions(+), 83 deletions(-)

diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py
index 47f67b7..31d7527 100644
--- a/python/src/main/resources/grpc/python/ipython_server.py
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -53,38 +53,49 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         print("execute code:\n")
         print(request.code.encode('utf-8'))
         sys.stdout.flush()
-        stderr_queue = queue.Queue(maxsize = 10)
-        text_queue = queue.Queue(maxsize = 10)
-        png_queue = queue.Queue(maxsize = 5)
-        jpeg_queue = queue.Queue(maxsize = 5)
-        html_queue = queue.Queue(maxsize = 10)
-
+        stream_reply_queue = queue.Queue(maxsize = 30)
+        payload_reply = []
         def _output_hook(msg):
             msg_type = msg['header']['msg_type']
             content = msg['content']
             print("******************* CONTENT ******************")
             print(str(content)[:400])
+            outStatus, outType, output = ipython_pb2.SUCCESS, None, None
+            # prepare the reply
             if msg_type == 'stream':
-                text_queue.put(content['text'])
+                outType = ipython_pb2.TEXT
+                output = content['text']
             elif msg_type in ('display_data', 'execute_result'):
-                if 'text/html' in content['data']:
-                    html_queue.put(content['data']['text/html'])
+                if 'image/jpeg' in content['data']:
+                    outType = ipython_pb2.JPEG
+                    output = content['data']['image/jpeg']
                 elif 'image/png' in content['data']:
-                    png_queue.put(content['data']['image/png'])
-                elif 'image/jpeg' in content['data']:
-                    jpeg_queue.put(content['data']['image/jpeg'])
+                    outType = ipython_pb2.PNG
+                    output = content['data']['image/png']
                 elif 'text/plain' in content['data']:
-                    text_queue.put(content['data']['text/plain'])
+                    outType = ipython_pb2.TEXT
+                    output = content['data']['text/plain']
+                elif 'text/html' in content['data']:
+                    outType = ipython_pb2.HTML
+                    output = content['data']['text/html']
                 elif 'application/javascript' in content['data']:
-                    print('add to html queue: ' + str(content)[:100])
-                    html_queue.put('<script> ' + content['data']['application/javascript'] + ' </script>\n')
+                    outType = ipython_pb2.HTML
+                    output = '<script> ' + content['data']['application/javascript'] + ' </script>\n'
+                    print('add to html output: ' + str(content)[:100])
                 elif 'application/vnd.holoviews_load.v0+json' in content['data']:
-                    html_queue.put('<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n')
-
+                    outType = ipython_pb2.HTML
+                    output = '<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n'
             elif msg_type == 'error':
-                stderr_queue.put('\n'.join(content['traceback']))
-
-        payload_reply = []
+                outStatus = ipython_pb2.ERROR
+                outType = ipython_pb2.TEXT
+                output = '\n'.join(content['traceback'])
+
+            # send reply if we supported the output type
+            if outType is not None:
+                stream_reply_queue.put(
+                    ipython_pb2.ExecuteResponse(status=outStatus,
+                                                type=outType,
+                                                output=output))
         def execute_worker():
             reply = self._kc.execute_interactive(request.code,
                                           output_hook=_output_hook,
@@ -94,69 +105,24 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         t = threading.Thread(name="ConsumerThread", target=execute_worker)
         with self._lock:
             t.start()
-            # We want to ensure that the kernel is alive because in case of OOM or other errors
-            # Execution might be stuck there:
-            # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
-            while t.is_alive() and self.isKernelAlive():
-                while not text_queue.empty():
-                    output = text_queue.get()
-                    yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                                      type=ipython_pb2.TEXT,
-                                                      output=output)
-                while not html_queue.empty():
-                    output = html_queue.get()
-                    yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                                      type=ipython_pb2.HTML,
-                                                      output=output)
-                while not stderr_queue.empty():
-                    output = stderr_queue.get()
-                    yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
-                                                      type=ipython_pb2.TEXT,
-                                                      output=output)
-                while not png_queue.empty():
-                    output = png_queue.get()
-                    yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                                      type=ipython_pb2.PNG,
-                                                      output=output)
-                while not jpeg_queue.empty():
-                    output = jpeg_queue.get()
-                    yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                                      type=ipython_pb2.JPEG,
-                                                      output=output)
-
-        # if kernel is not alive (should be same as thread is still alive), means that we face
-        # an unexpected issue.
-        if not self.isKernelAlive() or t.is_alive():
-            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
-                                                type=ipython_pb2.TEXT,
-                                                output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
-            return
-
-        while not text_queue.empty():
-            output = text_queue.get()
-            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                              type=ipython_pb2.TEXT,
-                                              output=output)
-        while not html_queue.empty():
-            output = html_queue.get()
-            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                              type=ipython_pb2.HTML,
-                                              output=output)
-        while not stderr_queue.empty():
-            output = stderr_queue.get()
-            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
-                                              type=ipython_pb2.TEXT,
-                                              output=output)
-        while not png_queue.empty():
-            output = png_queue.get()
-            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                              type=ipython_pb2.PNG,
-                                              output=output)
-        while not jpeg_queue.empty():
-            output = jpeg_queue.get()
-            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
-                                              type=ipython_pb2.JPEG,
-                                              output=output)
+            # We want to wait the end of the execution (and queue empty).
+            # In our case when the thread is not alive -> it means that the execution is complete
+            # However we also ensure that the kernel is alive because in case of OOM or other errors
+            # Execution might be stuck there: (might open issue on jupyter client)
+            # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L323
+            while (t.is_alive() and self.isKernelAlive()) or not stream_reply_queue.empty():
+                # Sleeping time to time to reduce cpu usage.
+                # At worst it will bring a 0.05 delay for bunch of messages.
+                # Overall it will improve performance.
+                time.sleep(0.05)
+                while not stream_reply_queue.empty():
+                    yield stream_reply_queue.get()
+
+            # if kernel is not alive or thread is still alive, it means that we face an issue.
+            if not self.isKernelAlive() or t.is_alive():
+                yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
+                                                  type=ipython_pb2.TEXT,
+                                                  output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
         if payload_reply:
             result = []
             for payload in payload_reply[0]['content']['payload']: