You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/09/11 09:20:35 UTC
[zeppelin] branch master updated: [ZEPPELIN-4090] Ipython queue
performance
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 0b1a394 [ZEPPELIN-4090] Ipython queue performance
0b1a394 is described below
commit 0b1a39490b799f0649a4c44d74e540f8c8dca2e4
Author: marc hurabielle <ma...@gmail.com>
AuthorDate: Wed Sep 11 14:39:43 2019 +0900
[ZEPPELIN-4090] Ipython queue performance
### What is this PR for?
The pr is to fix a bug that will make the **ipython** queue listener, overuse cpu. After this fix, cpu usage should be way lower.
Also there is a bit of refactor to use only one queue to ensure message will be order even with a sleep.
Also, the refactor is reducing the number of line and code duplication
### What type of PR is it?
Bug Fix / performance improvement
### Todos
* [x] - Performance improvement
### What is the Jira issue?
It is one part of the jira issue.
https://issues.apache.org/jira/browse/ZEPPELIN-4090
### How should this be tested?
* First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
* Strongly recommended: add automated unit tests for any new or changed behavior
* Outline any manual steps to test the PR here.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: marc hurabielle <ma...@gmail.com>
Closes #3337 from AyWa/fix/ipython-cpu and squashes the following commits:
1abe50f25 [marc hurabielle] fix error message when kernel stop
30ef7f7cb [marc hurabielle] fix identation
fe20923cf [marc hurabielle] fix rebase and fix lint
e1d37b76e [marc hurabielle] increase queue size
42f3ca097 [marc hurabielle] use one queue and use sleep to improve ipython performance
---
.../main/resources/grpc/python/ipython_server.py | 132 ++++++++-------------
1 file changed, 49 insertions(+), 83 deletions(-)
diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py
index 47f67b7..31d7527 100644
--- a/python/src/main/resources/grpc/python/ipython_server.py
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -53,38 +53,49 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
print("execute code:\n")
print(request.code.encode('utf-8'))
sys.stdout.flush()
- stderr_queue = queue.Queue(maxsize = 10)
- text_queue = queue.Queue(maxsize = 10)
- png_queue = queue.Queue(maxsize = 5)
- jpeg_queue = queue.Queue(maxsize = 5)
- html_queue = queue.Queue(maxsize = 10)
-
+ stream_reply_queue = queue.Queue(maxsize = 30)
+ payload_reply = []
def _output_hook(msg):
msg_type = msg['header']['msg_type']
content = msg['content']
print("******************* CONTENT ******************")
print(str(content)[:400])
+ outStatus, outType, output = ipython_pb2.SUCCESS, None, None
+ # prepare the reply
if msg_type == 'stream':
- text_queue.put(content['text'])
+ outType = ipython_pb2.TEXT
+ output = content['text']
elif msg_type in ('display_data', 'execute_result'):
- if 'text/html' in content['data']:
- html_queue.put(content['data']['text/html'])
+ if 'image/jpeg' in content['data']:
+ outType = ipython_pb2.JPEG
+ output = content['data']['image/jpeg']
elif 'image/png' in content['data']:
- png_queue.put(content['data']['image/png'])
- elif 'image/jpeg' in content['data']:
- jpeg_queue.put(content['data']['image/jpeg'])
+ outType = ipython_pb2.PNG
+ output = content['data']['image/png']
elif 'text/plain' in content['data']:
- text_queue.put(content['data']['text/plain'])
+ outType = ipython_pb2.TEXT
+ output = content['data']['text/plain']
+ elif 'text/html' in content['data']:
+ outType = ipython_pb2.HTML
+ output = content['data']['text/html']
elif 'application/javascript' in content['data']:
- print('add to html queue: ' + str(content)[:100])
- html_queue.put('<script> ' + content['data']['application/javascript'] + ' </script>\n')
+ outType = ipython_pb2.HTML
+ output = '<script> ' + content['data']['application/javascript'] + ' </script>\n'
+ print('add to html output: ' + str(content)[:100])
elif 'application/vnd.holoviews_load.v0+json' in content['data']:
- html_queue.put('<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n')
-
+ outType = ipython_pb2.HTML
+ output = '<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n'
elif msg_type == 'error':
- stderr_queue.put('\n'.join(content['traceback']))
-
- payload_reply = []
+ outStatus = ipython_pb2.ERROR
+ outType = ipython_pb2.TEXT
+ output = '\n'.join(content['traceback'])
+
+ # send reply if we supported the output type
+ if outType is not None:
+ stream_reply_queue.put(
+ ipython_pb2.ExecuteResponse(status=outStatus,
+ type=outType,
+ output=output))
def execute_worker():
reply = self._kc.execute_interactive(request.code,
output_hook=_output_hook,
@@ -94,69 +105,24 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
t = threading.Thread(name="ConsumerThread", target=execute_worker)
with self._lock:
t.start()
- # We want to ensure that the kernel is alive because in case of OOM or other errors
- # Execution might be stuck there:
- # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
- while t.is_alive() and self.isKernelAlive():
- while not text_queue.empty():
- output = text_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.TEXT,
- output=output)
- while not html_queue.empty():
- output = html_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.HTML,
- output=output)
- while not stderr_queue.empty():
- output = stderr_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
- type=ipython_pb2.TEXT,
- output=output)
- while not png_queue.empty():
- output = png_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.PNG,
- output=output)
- while not jpeg_queue.empty():
- output = jpeg_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.JPEG,
- output=output)
-
- # if kernel is not alive (should be same as thread is still alive), means that we face
- # an unexpected issue.
- if not self.isKernelAlive() or t.is_alive():
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
- type=ipython_pb2.TEXT,
- output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
- return
-
- while not text_queue.empty():
- output = text_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.TEXT,
- output=output)
- while not html_queue.empty():
- output = html_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.HTML,
- output=output)
- while not stderr_queue.empty():
- output = stderr_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
- type=ipython_pb2.TEXT,
- output=output)
- while not png_queue.empty():
- output = png_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.PNG,
- output=output)
- while not jpeg_queue.empty():
- output = jpeg_queue.get()
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
- type=ipython_pb2.JPEG,
- output=output)
+ # We want to wait the end of the execution (and queue empty).
+ # In our case when the thread is not alive -> it means that the execution is complete
+ # However we also ensure that the kernel is alive because in case of OOM or other errors
+ # Execution might be stuck there: (might open issue on jupyter client)
+ # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L323
+ while (t.is_alive() and self.isKernelAlive()) or not stream_reply_queue.empty():
+ # Sleeping time to time to reduce cpu usage.
+ # At worst it will bring a 0.05 delay for bunch of messages.
+ # Overall it will improve performance.
+ time.sleep(0.05)
+ while not stream_reply_queue.empty():
+ yield stream_reply_queue.get()
+
+ # if kernel is not alive or thread is still alive, it means that we face an issue.
+ if not self.isKernelAlive() or t.is_alive():
+ yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
+ type=ipython_pb2.TEXT,
+ output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
if payload_reply:
result = []
for payload in payload_reply[0]['content']['payload']: