You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/26 11:39:53 UTC
[zeppelin] branch branch-0.8 updated: [ZEPPELIN-4089] handle
ipython kernel crash
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new b995ceb [ZEPPELIN-4089] handle ipython kernel crash
b995ceb is described below
commit b995cebb8eb2e547f13a6a1358096d1d5473d8c0
Author: marc hurabielle <ma...@gmail.com>
AuthorDate: Sun Mar 24 17:43:22 2019 +0900
[ZEPPELIN-4089] handle ipython kernel crash
The pr is the last fix of the `ZEPPELIN-4089` It will aims to notify user when ipython kernel crash. Indeed, right now, execution will be stuck forever when ipython kernel crash.
Bug Fix
* [x] - logic to check kernel status
* [ ] - decide what to do when we spot the failure
* [x] - test
https://issues.apache.org/jira/browse/ZEPPELIN-4089
* First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
* Strongly recommended: add automated unit tests for any new or changed behavior
* Outline any manual steps to test the PR here.
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: marc hurabielle <ma...@gmail.com>
Closes #3339 from AyWa/fix/kernel-crash and squashes the following commits:
6f4910c67 [marc hurabielle] fix lint
73424d17d [marc hurabielle] Revert "example for kernel die and request stuck"
f37d5f95d [marc hurabielle] add test for kernel die
c9ec0335c [marc hurabielle] example for kernel die and request stuck
5fe84dfef [marc hurabielle] handle kernel crash
(cherry picked from commit ef5e173d3aa1e708cc8994a3f82a25d357ca7005)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../main/resources/grpc/python/ipython_server.py | 44 +++++++++++++++------
.../zeppelin/python/IPythonInterpreterTest.java | 45 +++++++++++++++++++++-
2 files changed, 76 insertions(+), 13 deletions(-)
diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py
index 4b68efd..36e0a13 100644
--- a/python/src/main/resources/grpc/python/ipython_server.py
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -16,6 +16,7 @@
from __future__ import print_function
import jupyter_client
+import os
import sys
import threading
import time
@@ -25,8 +26,6 @@ import grpc
import ipython_pb2
import ipython_pb2_grpc
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
-
is_py2 = sys.version[0] == '2'
if is_py2:
import Queue as queue
@@ -34,8 +33,6 @@ else:
import queue as queue
-TIMEOUT = 60*60*24*365*100 # 100 years
-
class IPython(ipython_pb2_grpc.IPythonServicer):
def __init__(self, server):
@@ -73,13 +70,16 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
def execute_worker():
reply = self._kc.execute_interactive(request.code,
output_hook=_output_hook,
- timeout=TIMEOUT)
+ timeout=None)
payload_reply.append(reply)
t = threading.Thread(name="ConsumerThread", target=execute_worker)
t.start()
- while t.is_alive():
+ # We want to ensure that the kernel is alive because in case of OOM or other errors
+ # Execution might be stuck there:
+ # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
+ while t.is_alive() and self.isKernelAlive():
while not stdout_queue.empty():
output = stdout_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -96,6 +96,14 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
type=ipython_pb2.IMAGE,
output=output)
+ # if kernel is not alive (should be same as thread is still alive), means that we face
+ # an unexpected issue.
+ if not self.isKernelAlive() or t.is_alive():
+ yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
+ type=ipython_pb2.TEXT,
+ output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
+ return
+
while not stdout_queue.empty():
output = stdout_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -127,15 +135,21 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
return ipython_pb2.CancelResponse()
def complete(self, request, context):
- reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=TIMEOUT)
+ reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None)
return ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
def status(self, request, context):
return ipython_pb2.StatusResponse(status = self._status)
+ def isKernelAlive(self):
+ return self._km.is_alive()
+
+ def terminate(self):
+ self._km.shutdown_kernel()
+
def stop(self, request, context):
- self._server.stop(0)
- sys.exit(0)
+ self.terminate()
+ return ipython_pb2.StopResponse()
def serve(port):
@@ -146,10 +160,16 @@ def serve(port):
server.start()
ipython.start()
try:
- while True:
- time.sleep(_ONE_DAY_IN_SECONDS)
+ while ipython.isKernelAlive():
+ time.sleep(5)
except KeyboardInterrupt:
- server.stop(0)
+ print("interrupted")
+ finally:
+ print("shutdown")
+ # we let 2 sc for all request to be complete
+ server.stop(2)
+ ipython.terminate()
+ os._exit(0)
if __name__ == '__main__':
serve(sys.argv[1])
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index d7773d4..dea9766 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -76,6 +77,7 @@ public class IPythonInterpreterTest {
}
@Test
+<<<<<<< HEAD
public void testGrpcFrameSize() throws InterpreterException, IOException {
Properties properties = new Properties();
properties.setProperty("zeppelin.ipython.grpc.message_size", "4");
@@ -115,7 +117,6 @@ public class IPythonInterpreterTest {
InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-
InterpreterContext context = getInterpreterContext();
result = interpreter.interpret("import sys\nprint(sys.version[0])", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -465,6 +466,48 @@ public class IPythonInterpreterTest {
result = interpreter.interpret("import time\nprint(\"Hello\")\ntime.sleep(0.5)\nz.getInterpreterContext().out().clear()\nprint(\"world\")\n", context);
assertEquals("%text world\n", context.out.getCurrentOutput().toString());
}
+
+ public void testIpythonKernelCrash_shouldNotHangExecution()
+ throws InterpreterException, IOException {
+ // The goal of this test is to ensure that we handle case when the kernel die.
+ // In order to do so, we will kill the kernel process from the python code.
+ // A real example of that could be a out of memory by the code we execute.
+ String codeDep = "!pip install psutil";
+ String codeFindPID = "from os import getpid\n"
+ + "import psutil\n"
+ + "pids = psutil.pids()\n"
+ + "my_pid = getpid()\n"
+ + "pidToKill = []\n"
+ + "for pid in pids:\n"
+ + " try:\n"
+ + " p = psutil.Process(pid)\n"
+ + " cmd = p.cmdline()\n"
+ + " for arg in cmd:\n"
+ + " if arg.count('ipykernel'):\n"
+ + " pidToKill.append(pid)\n"
+ + " except:\n"
+ + " continue\n"
+ + "len(pidToKill)";
+ String codeKillKernel = "from os import kill\n"
+ + "import signal\n"
+ + "for pid in pidToKill:\n"
+ + " kill(pid, signal.SIGKILL)";
+ InterpreterContext context = getInterpreterContext();
+ InterpreterResult result = interpreter.interpret(codeDep, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ context = getInterpreterContext();
+ result = interpreter.interpret(codeFindPID, context);
+ assertEquals(Code.SUCCESS, result.code());
+ InterpreterResultMessage output = context.out.toInterpreterResultMessage().get(0);
+ int numberOfPID = Integer.parseInt(output.getData());
+ assertTrue(numberOfPID > 0);
+ context = getInterpreterContext();
+ result = interpreter.interpret(codeKillKernel, context);
+ assertEquals(Code.ERROR, result.code());
+ output = context.out.toInterpreterResultMessage().get(0);
+ assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs. "
+ + "It might be because of an out of memory issue."));
+ }
private static InterpreterContext getInterpreterContext() {
return new InterpreterContext(