You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/30 13:18:29 UTC
[zeppelin] 01/02: Revert "[ZEPPELIN-4089] handle ipython kernel
crash"
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
commit 6c24d1f31c289e015795bd9f74087d63a4a5ae39
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Mar 29 22:19:50 2019 +0800
Revert "[ZEPPELIN-4089] handle ipython kernel crash"
This reverts commit ef5e173d3aa1e708cc8994a3f82a25d357ca7005.
---
.../main/resources/grpc/python/ipython_server.py | 44 ++++++----------------
.../zeppelin/python/IPythonInterpreterTest.java | 44 ----------------------
2 files changed, 12 insertions(+), 76 deletions(-)
diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/python/src/main/resources/grpc/python/ipython_server.py
index 36e0a13..4b68efd 100644
--- a/python/src/main/resources/grpc/python/ipython_server.py
+++ b/python/src/main/resources/grpc/python/ipython_server.py
@@ -16,7 +16,6 @@
from __future__ import print_function
import jupyter_client
-import os
import sys
import threading
import time
@@ -26,6 +25,8 @@ import grpc
import ipython_pb2
import ipython_pb2_grpc
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+
is_py2 = sys.version[0] == '2'
if is_py2:
import Queue as queue
@@ -33,6 +34,8 @@ else:
import queue as queue
+TIMEOUT = 60*60*24*365*100 # 100 years
+
class IPython(ipython_pb2_grpc.IPythonServicer):
def __init__(self, server):
@@ -70,16 +73,13 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
def execute_worker():
reply = self._kc.execute_interactive(request.code,
output_hook=_output_hook,
- timeout=None)
+ timeout=TIMEOUT)
payload_reply.append(reply)
t = threading.Thread(name="ConsumerThread", target=execute_worker)
t.start()
- # We want to ensure that the kernel is alive because in case of OOM or other errors
- # Execution might be stuck there:
- # https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/blocking/client.py#L32
- while t.is_alive() and self.isKernelAlive():
+ while t.is_alive():
while not stdout_queue.empty():
output = stdout_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -96,14 +96,6 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
type=ipython_pb2.IMAGE,
output=output)
- # if kernel is not alive (should be same as thread is still alive), means that we face
- # an unexpected issue.
- if not self.isKernelAlive() or t.is_alive():
- yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR,
- type=ipython_pb2.TEXT,
- output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
- return
-
while not stdout_queue.empty():
output = stdout_queue.get()
yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS,
@@ -135,21 +127,15 @@ class IPython(ipython_pb2_grpc.IPythonServicer):
return ipython_pb2.CancelResponse()
def complete(self, request, context):
- reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None)
+ reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=TIMEOUT)
return ipython_pb2.CompletionResponse(matches=reply['content']['matches'])
def status(self, request, context):
return ipython_pb2.StatusResponse(status = self._status)
- def isKernelAlive(self):
- return self._km.is_alive()
-
- def terminate(self):
- self._km.shutdown_kernel()
-
def stop(self, request, context):
- self.terminate()
- return ipython_pb2.StopResponse()
+ self._server.stop(0)
+ sys.exit(0)
def serve(port):
@@ -160,16 +146,10 @@ def serve(port):
server.start()
ipython.start()
try:
- while ipython.isKernelAlive():
- time.sleep(5)
+ while True:
+ time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
- print("interrupted")
- finally:
- print("shutdown")
- # we let 2 sc for all request to be complete
- server.stop(2)
- ipython.terminate()
- os._exit(0)
+ server.stop(0)
if __name__ == '__main__':
serve(sys.argv[1])
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index ca54502..9eba8d8 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -24,7 +24,6 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.junit.Test;
@@ -70,49 +69,6 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
}
@Test
- public void testIpythonKernelCrash_shouldNotHangExecution()
- throws InterpreterException, IOException {
- // The goal of this test is to ensure that we handle case when the kernel die.
- // In order to do so, we will kill the kernel process from the python code.
- // A real example of that could be a out of memory by the code we execute.
- String codeDep = "!pip install psutil";
- String codeFindPID = "from os import getpid\n"
- + "import psutil\n"
- + "pids = psutil.pids()\n"
- + "my_pid = getpid()\n"
- + "pidToKill = []\n"
- + "for pid in pids:\n"
- + " try:\n"
- + " p = psutil.Process(pid)\n"
- + " cmd = p.cmdline()\n"
- + " for arg in cmd:\n"
- + " if arg.count('ipykernel'):\n"
- + " pidToKill.append(pid)\n"
- + " except:\n"
- + " continue\n"
- + "len(pidToKill)";
- String codeKillKernel = "from os import kill\n"
- + "import signal\n"
- + "for pid in pidToKill:\n"
- + " kill(pid, signal.SIGKILL)";
- InterpreterContext context = getInterpreterContext();
- InterpreterResult result = interpreter.interpret(codeDep, context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- context = getInterpreterContext();
- result = interpreter.interpret(codeFindPID, context);
- assertEquals(Code.SUCCESS, result.code());
- InterpreterResultMessage output = context.out.toInterpreterResultMessage().get(0);
- int numberOfPID = Integer.parseInt(output.getData());
- assertTrue(numberOfPID > 0);
- context = getInterpreterContext();
- result = interpreter.interpret(codeKillKernel, context);
- assertEquals(Code.ERROR, result.code());
- output = context.out.toInterpreterResultMessage().get(0);
- assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs. "
- + "It might be because of an out of memory issue."));
- }
-
- @Test
public void testIPythonAdvancedFeatures()
throws InterpreterException, InterruptedException, IOException {
// ipython help