You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/04/10 07:55:53 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN4734]. Sometimes it is unable to restart interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new b584a60  [ZEPPELIN4734]. Sometimes it is unable to restart interpreter
b584a60 is described below

commit b584a6019bf77a8bea90446e2e14ddbc9bb508af
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Apr 8 11:47:51 2020 +0800

    [ZEPPELIN4734]. Sometimes it is unable to restart interpreter
    
    ### What is this PR for?
    The root cause is that when restarting interpreter, zeppelin will first cancel all jobs, while the cancelling paragraph thread in interpreter process may invoke thrift call on zeppelin server side.  The stacktrace in jira description has one such example of flink interpreter.
    
    This PR fix this issue by canceling the paragraph in another thread. It is fine to cancel paragraph asynchronously
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4734
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3723 from zjffdu/ZEPPELIN-4734 and squashes the following commits:
    
    c0e8c80c8 [Jeff Zhang] [ZEPPELIN-4734]. Sometimes it is unable to restart interpreter
    
    (cherry picked from commit 8eaf864d43a46f10fc75f82464ac0f49d6914b75)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../java/org/apache/zeppelin/flink/PyFlinkInterpreter.java  |  2 +-
 .../interpreter/remote/RemoteInterpreterServer.java         | 13 ++++++++-----
 .../interpreter/remote/RemoteInterpreterServerTest.java     |  2 ++
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 91ec0fe..eaebd64 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -119,7 +119,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
       flinkInterpreter.createPlannerAgain();
       return super.interpret(st, context);
     } finally {
-      if (getPythonProcessLauncher().isRunning()) {
+      if (useIPython() || (!useIPython() && getPythonProcessLauncher().isRunning())) {
         InterpreterResult result = super.interpret("intp.resetClassLoaderInPythonThread()", context);
         if (result.code() != InterpreterResult.Code.SUCCESS) {
           LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index d6bd116..9b9e063 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -751,11 +751,14 @@ public class RemoteInterpreterServer extends Thread
     if (job != null && job.getStatus() == Status.PENDING) {
       job.setStatus(Status.ABORT);
     } else {
-      try {
-        intp.cancel(convert(interpreterContext, null));
-      } catch (InterpreterException e) {
-        throw new TException("Fail to cancel", e);
-      }
+      Thread thread = new Thread( ()-> {
+        try {
+          intp.cancel(convert(interpreterContext, null));
+        } catch (InterpreterException e) {
+          logger.error("Fail to cancel paragraph: " + interpreterContext.getParagraphId());
+        }
+      });
+      thread.start();
     }
   }
 
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index 7beeee8..ebbeea1 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -174,6 +174,8 @@ public class RemoteInterpreterServerTest {
     Thread.sleep(1000);
     assertFalse(interpreter1.cancelled.get());
     server.cancel("session_1", Test1Interpreter.class.getName(), intpContext);
+    // Sleep 1 second, because cancel is async.
+    Thread.sleep(1000);
     assertTrue(interpreter1.cancelled.get());
 
     // getProgress