You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/30 13:18:29 UTC

[zeppelin] 01/02: Revert "[ZEPPELIN-4089] handle ipython kernel crash"

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git

commit 6c24d1f31c289e015795bd9f74087d63a4a5ae39
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Mar 29 22:19:50 2019 +0800

    Revert "[ZEPPELIN-4089] handle ipython kernel crash"
    
    This reverts commit ef5e173d3aa1e708cc8994a3f82a25d357ca7005.
---
 .../main/resources/grpc/python/ipython_server.py   | 44 ++++++----------------
 .../zeppelin/python/IPythonInterpreterTest.java    | 44 ----------------------
 2 files changed, 12 insertions(+), 76 deletions(-)

diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py
index 36e0a13..4b68efd 100644
--- a/python/src/main/resources/grpc/python/ipython_server.py
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -16,7 +16,6 @@
 from __future__ import print_function
 
 import jupyter_client
-import os
 import sys
 import threading
 import time
@@ -26,6 +25,8 @@ import grpc
 import ipython_pb2
 import ipython_pb2_grpc
 
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+
 is_py2 = sys.version[0] == '2'
 if is_py2:
     import Queue as queue
@@ -33,6 +34,8 @@ else:
     import queue as queue
 
 
+TIMEOUT = 60*60*24*365*100  # 100 years
+
 class IPython(ipython_pb2_grpc.IPythonServicer):
 
     def __init__(self, server):
@@ -70,16 +73,13 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         def execute_worker():
             reply = self._kc.execute_interactive(request.code,
                                             output_hook=_output_hook,
-                                            timeout=None)
+                                            timeout=TIMEOUT)
             payload_reply.append(reply)
 
         t = threading.Thread(name="ConsumerThread", target=execute_worker)
         t.start()
 
-        # We want to ensure that the kernel is alive because in case of OOM or other errors
-        # Execution might be stuck there:
-        # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
-        while t.is_alive() and self.isKernelAlive():
+        while t.is_alive():
             while not stdout_queue.empty():
                 output = stdout_queue.get()
                 yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -96,14 +96,6 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
                                                   type=ipython_pb2.IMAGE,
                                                   output=output)
 
-        # if kernel is not alive (should be same as thread is still alive), means that we face
-        # an unexpected issue.
-        if not self.isKernelAlive() or t.is_alive():
-            yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
-                                                type=ipython_pb2.TEXT,
-                                                output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
-            return
-
         while not stdout_queue.empty():
             output = stdout_queue.get()
             yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -135,21 +127,15 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
         return ipython_pb2.CancelResponse()
 
     def complete(self, request, context):
-        reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None)
+        reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=TIMEOUT)
         return ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
 
     def status(self, request, context):
         return ipython_pb2.StatusResponse(status = self._status)
 
-    def isKernelAlive(self):
-        return self._km.is_alive()
-
-    def terminate(self):
-        self._km.shutdown_kernel()
-
     def stop(self, request, context):
-        self.terminate()
-        return ipython_pb2.StopResponse()
+        self._server.stop(0)
+        sys.exit(0)
 
 
 def serve(port):
@@ -160,16 +146,10 @@ def serve(port):
     server.start()
     ipython.start()
     try:
-        while ipython.isKernelAlive():
-            time.sleep(5)
+        while True:
+            time.sleep(_ONE_DAY_IN_SECONDS)
     except KeyboardInterrupt:
-        print("interrupted")
-    finally:
-        print("shutdown")
-        # we let 2 sc for all request to be complete
-        server.stop(2)
-        ipython.terminate()
-        os._exit(0)
+        server.stop(0)
 
 if __name__ == '__main__':
     serve(sys.argv[1])
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index ca54502..9eba8d8 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -24,7 +24,6 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.junit.Test;
@@ -70,49 +69,6 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
   }
 
   @Test
-  public void testIpythonKernelCrash_shouldNotHangExecution()
-      throws InterpreterException, IOException {
-    // The goal of this test is to ensure that we handle case when the kernel die.
-    // In order to do so, we will kill the kernel process from the python code.
-    // A real example of that could be a out of memory by the code we execute.
-    String codeDep = "!pip install psutil";
-    String codeFindPID = "from os import getpid\n"
-        + "import psutil\n"
-        + "pids = psutil.pids()\n"
-        + "my_pid = getpid()\n"
-        + "pidToKill = []\n"
-        + "for pid in pids:\n"
-        + "    try:\n"
-        + "        p = psutil.Process(pid)\n"
-        + "        cmd = p.cmdline()\n"
-        + "        for arg in cmd:\n"
-        + "            if arg.count('ipykernel'):\n"
-        + "                pidToKill.append(pid)\n"
-        + "    except:\n"
-        + "        continue\n"
-        + "len(pidToKill)";
-    String codeKillKernel = "from os import kill\n"
-        + "import signal\n"
-        + "for pid in pidToKill:\n"
-        + "    kill(pid, signal.SIGKILL)";
-    InterpreterContext context = getInterpreterContext();
-    InterpreterResult result = interpreter.interpret(codeDep, context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    context = getInterpreterContext();
-    result = interpreter.interpret(codeFindPID, context);
-    assertEquals(Code.SUCCESS, result.code());
-    InterpreterResultMessage output = context.out.toInterpreterResultMessage().get(0);
-    int numberOfPID = Integer.parseInt(output.getData());
-    assertTrue(numberOfPID > 0);
-    context = getInterpreterContext();
-    result = interpreter.interpret(codeKillKernel, context);
-    assertEquals(Code.ERROR, result.code());
-    output = context.out.toInterpreterResultMessage().get(0);
-    assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs. "
-        + "It might be because of an out of memory issue."));
-  }
-
-  @Test
   public void testIPythonAdvancedFeatures()
       throws InterpreterException, InterruptedException, IOException {
     // ipython help