You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/26 11:39:53 UTC

[zeppelin] branch branch-0.8 updated: [ZEPPELIN-4089] handle ipython kernel crash

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new b995ceb  [ZEPPELIN-4089] handle ipython kernel crash
b995ceb is described below

commit b995cebb8eb2e547f13a6a1358096d1d5473d8c0
Author: marc hurabielle <ma...@gmail.com>
AuthorDate: Sun Mar 24 17:43:22 2019 +0900

    [ZEPPELIN-4089] handle ipython kernel crash
    
    The pr is the last fix of the `ZEPPELIN-4089` It will aims to notify user when ipython kernel crash. Indeed, right now, execution will be stuck forever when ipython kernel crash.
    
    Bug Fix
    
    * [x] - logic to check kernel status
    * [ ] - decide what to do when we spot the failure
    * [x] - test
    
    https://issues.apache.org/jira/browse/ZEPPELIN-4089
    
    * First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed behavior
    * Outline any manual steps to test the PR here.
    
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: marc hurabielle <ma...@gmail.com>
    
    Closes #3339 from AyWa/fix/kernel-crash and squashes the following commits:
    
    6f4910c67 [marc hurabielle] fix lint
    73424d17d [marc hurabielle] Revert "example for kernel die and request stuck"
    f37d5f95d [marc hurabielle] add test for kernel die
    c9ec0335c [marc hurabielle] example for kernel die and request stuck
    5fe84dfef [marc hurabielle] handle kernel crash
    
    (cherry picked from commit ef5e173d3aa1e708cc8994a3f82a25d357ca7005)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../main/resources/grpc/python/ipython_server.py   | 44 +++++++++++++++------
 .../zeppelin/python/IPythonInterpreterTest.java    | 45 +++++++++++++++++++++-
 2 files changed, 76 insertions(+), 13 deletions(-)

diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py
index 4b68efd..36e0a13 100644
--- a/python/src/main/resources/grpc/python/ipython_server.py
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -16,6 +16,7 @@
 from __future__ import print_function
 
 import jupyter_client
+import os
 import sys
 import threading
 import time
@@ -25,8 +26,6 @@ import grpc
 import ipython_pb2
 import ipython_pb2_grpc
 
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
-
 is_py2 = sys.version[0] == '2'
 if is_py2:
     import Queue as queue
@@ -34,8 +33,6 @@ else:
     import queue as queue
 
 
-TIMEOUT = 60*60*24*365*100  # 100 years
-
 class IPython(ipython_pb2_grpc.IPythonServicer):
 
     def __init__(self, server):
@@ -73,13 +70,16 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         def execute_worker():
             reply = self._kc.execute_interactive(request.code,
                                             output_hook=_output_hook,
-                                            timeout=TIMEOUT)
+                                            timeout=None)
             payload_reply.append(reply)
 
         t = threading.Thread(name="ConsumerThread", target=execute_worker)
         t.start()
 
-        while t.is_alive():
+        # We want to ensure that the kernel is alive because in case of OOM or other errors
+        # Execution might be stuck there:
+        # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
+        while t.is_alive() and self.isKernelAlive():
             while not stdout_queue.empty():
                 output = stdout_queue.get()
                 yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -96,6 +96,14 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
                                                   type=ipython_pb2.IMAGE,
                                                   output=output)
 
+        # if kernel is not alive (should be same as thread is still alive), means that we face
+        # an unexpected issue.
+        if not self.isKernelAlive() or t.is_alive():
+            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
+                                                type=ipython_pb2.TEXT,
+                                                output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
+            return
+
         while not stdout_queue.empty():
             output = stdout_queue.get()
             yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -127,15 +135,21 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         return ipython_pb2.CancelResponse()
 
     def complete(self, request, context):
-        reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=TIMEOUT)
+        reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None)
         return ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
 
     def status(self, request, context):
         return ipython_pb2.StatusResponse(status = self._status)
 
+    def isKernelAlive(self):
+        return self._km.is_alive()
+
+    def terminate(self):
+        self._km.shutdown_kernel()
+
     def stop(self, request, context):
-        self._server.stop(0)
-        sys.exit(0)
+        self.terminate()
+        return ipython_pb2.StopResponse()
 
 
 def serve(port):
@@ -146,10 +160,16 @@ def serve(port):
     server.start()
     ipython.start()
     try:
-        while True:
-            time.sleep(_ONE_DAY_IN_SECONDS)
+        while ipython.isKernelAlive():
+            time.sleep(5)
     except KeyboardInterrupt:
-        server.stop(0)
+        print("interrupted")
+    finally:
+        print("shutdown")
+        # we let 2 sc for all request to be complete
+        server.stop(2)
+        ipython.terminate()
+        os._exit(0)
 
 if __name__ == '__main__':
     serve(sys.argv[1])
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index d7773d4..dea9766 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -76,6 +77,7 @@ public class IPythonInterpreterTest {
   }
 
   @Test
+<<<<<<< HEAD
   public void testGrpcFrameSize() throws InterpreterException, IOException {
     Properties properties = new Properties();
     properties.setProperty("zeppelin.ipython.grpc.message_size", "4");
@@ -115,7 +117,6 @@ public class IPythonInterpreterTest {
     InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-
     InterpreterContext context = getInterpreterContext();
     result = interpreter.interpret("import sys\nprint(sys.version[0])", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -465,6 +466,48 @@ public class IPythonInterpreterTest {
     result = interpreter.interpret("import time\nprint(\"Hello\")\ntime.sleep(0.5)\nz.getInterpreterContext().out().clear()\nprint(\"world\")\n", context);
     assertEquals("%text world\n", context.out.getCurrentOutput().toString());
   }
+  
+  public void testIpythonKernelCrash_shouldNotHangExecution()
+          throws InterpreterException, IOException {
+    // The goal of this test is to ensure that we handle case when the kernel die.
+    // In order to do so, we will kill the kernel process from the python code.
+    // A real example of that could be a out of memory by the code we execute.
+    String codeDep = "!pip install psutil";
+    String codeFindPID = "from os import getpid\n"
+            + "import psutil\n"
+            + "pids = psutil.pids()\n"
+            + "my_pid = getpid()\n"
+            + "pidToKill = []\n"
+            + "for pid in pids:\n"
+            + "    try:\n"
+            + "        p = psutil.Process(pid)\n"
+            + "        cmd = p.cmdline()\n"
+            + "        for arg in cmd:\n"
+            + "            if arg.count('ipykernel'):\n"
+            + "                pidToKill.append(pid)\n"
+            + "    except:\n"
+            + "        continue\n"
+            + "len(pidToKill)";
+    String codeKillKernel = "from os import kill\n"
+            + "import signal\n"
+            + "for pid in pidToKill:\n"
+            + "    kill(pid, signal.SIGKILL)";
+    InterpreterContext context = getInterpreterContext();
+    InterpreterResult result = interpreter.interpret(codeDep, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    context = getInterpreterContext();
+    result = interpreter.interpret(codeFindPID, context);
+    assertEquals(Code.SUCCESS, result.code());
+    InterpreterResultMessage output = context.out.toInterpreterResultMessage().get(0);
+    int numberOfPID = Integer.parseInt(output.getData());
+    assertTrue(numberOfPID > 0);
+    context = getInterpreterContext();
+    result = interpreter.interpret(codeKillKernel, context);
+    assertEquals(Code.ERROR, result.code());
+    output = context.out.toInterpreterResultMessage().get(0);
+    assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs. "
+            + "It might be because of an out of memory issue."));
+  }
 
   private static InterpreterContext getInterpreterContext() {
     return new InterpreterContext(