You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/28 01:59:09 UTC

[zeppelin] branch master updated: ZEPPELIN-4081. when the python process is killed, the task state is still running

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new c9514e2  ZEPPELIN-4081. when the python process is killed,the task state is still running
c9514e2 is described below

commit c9514e26a00b5c2b76c358425bae79fee183c9e1
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Mar 21 11:18:02 2019 +0800

    ZEPPELIN-4081. when the python process is killed,the task state is still running
    
    ### What is this PR for?
    This PR will break python code execution if the python process is existed. Besides that, I also improve the error message for ipython interpreter although it doesn't have such issue.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4081
    
    ### How should this be tested?
    * Unit test is added
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3338 from zjffdu/ZEPPELIN-4081 and squashes the following commits:
    
    907faacf6 [Jeff Zhang] ZEPPELIN-4081. when the python process is killed,the task state is still running
---
 LICENSE                                            |  1 +
 python/pom.xml                                     |  8 ++++
 .../org/apache/zeppelin/python/IPythonClient.java  |  7 +++-
 .../apache/zeppelin/python/IPythonInterpreter.java | 49 ++++++++++++++++------
 .../apache/zeppelin/python/PythonInterpreter.java  | 23 ++++++++--
 .../zeppelin/python/BasePythonInterpreterTest.java |  3 +-
 .../zeppelin/python/IPythonInterpreterTest.java    | 29 +++++++++++++
 .../zeppelin/python/PythonInterpreterTest.java     | 33 ++++++++++++++-
 spark/interpreter/pom.xml                          |  6 +++
 .../apache/zeppelin/spark/IPySparkInterpreter.java |  3 +-
 10 files changed, 141 insertions(+), 21 deletions(-)

diff --git a/LICENSE b/LICENSE
index 3b34053..a456a41 100644
--- a/LICENSE
+++ b/LICENSE
@@ -260,6 +260,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
     (Apache 2.0) Nimbus JOSE+JWT (https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home)
     (Apache 2.0) jarchivelib (https://github.com/thrau/jarchivelib)
     (Apache 2.0) Google Cloud Client Library for Java (https://github.com/GoogleCloudPlatform/google-cloud-java)
+    (Apache 2.0) concurrentunit (https://github.com/jhalterman/concurrentunit)
 
 ========================================================================
 BSD 3-Clause licenses
diff --git a/python/pom.xml b/python/pom.xml
index d9371ed..2700cba 100644
--- a/python/pom.xml
+++ b/python/pom.xml
@@ -87,6 +87,14 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>net.jodah</groupId>
+      <artifactId>concurrentunit</artifactId>
+      <version>0.4.4</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
index b9c897b..c729898 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
@@ -53,6 +53,7 @@ public class IPythonClient {
   private final ManagedChannel channel;
   private final IPythonGrpc.IPythonBlockingStub blockingStub;
   private final IPythonGrpc.IPythonStub asyncStub;
+  private volatile boolean maybeIPythonFailed = false;
 
   private SecureRandom random = new SecureRandom();
 
@@ -83,6 +84,7 @@ public class IPythonClient {
     final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
         .setStatus(ExecuteStatus.SUCCESS);
     final AtomicBoolean completedFlag = new AtomicBoolean(false);
+    maybeIPythonFailed = false;
     LOGGER.debug("stream_execute code:\n" + request.getCode());
     asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
       int index = 0;
@@ -137,7 +139,7 @@ public class IPythonClient {
         }
         LOGGER.error("Fail to call IPython grpc", throwable);
         finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
-
+        maybeIPythonFailed = true;
         completedFlag.set(true);
         synchronized (completedFlag) {
           completedFlag.notify();
@@ -204,6 +206,9 @@ public class IPythonClient {
     asyncStub.stop(request, null);
   }
 
+  public boolean isMaybeIPythonFailed() {
+    return maybeIPythonFailed;
+  }
 
   public static void main(String[] args) {
     IPythonClient client = new IPythonClient("localhost", 50053);
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
index 9e23d04..f4c753d 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
@@ -82,7 +82,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
   private boolean useBuiltinPy4j = true;
   private boolean usePy4JAuth = true;
   private String secret;
-  private volatile boolean pythonProcessFailed = false;
+  private volatile boolean pythonProcessRunning = false;
 
   private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
 
@@ -294,7 +294,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
 
     // wait until IPython kernel is started or timeout
     long startTime = System.currentTimeMillis();
-    while (!pythonProcessFailed) {
+    while (!pythonProcessRunning) {
       try {
         Thread.sleep(100);
       } catch (InterruptedException e) {
@@ -305,6 +305,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
         StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
         if (response.getStatus() == IPythonStatus.RUNNING) {
           LOGGER.info("IPython Kernel is Running");
+          pythonProcessRunning = true;
           break;
         } else {
           LOGGER.info("Wait for IPython Kernel to be started");
@@ -319,7 +320,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
             + " seconds");
       }
     }
-    if (pythonProcessFailed) {
+    if (!pythonProcessRunning) {
       throw new IOException("Fail to launch IPython Kernel as the python process is failed");
     }
   }
@@ -355,23 +356,44 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
     }
   }
 
+  public ExecuteWatchdog getWatchDog() {
+    return watchDog;
+  }
+
   @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
+  public InterpreterResult interpret(String st,
+                                     InterpreterContext context) throws InterpreterException {
     zeppelinContext.setGui(context.getGui());
     zeppelinContext.setNoteGui(context.getNoteGui());
     zeppelinContext.setInterpreterContext(context);
     interpreterOutput.setInterpreterOutput(context.out);
-    ExecuteResponse response =
-        ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
-            interpreterOutput);
     try {
+      ExecuteResponse response =
+              ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
+                      interpreterOutput);
       interpreterOutput.getInterpreterOutput().flush();
-    } catch (IOException e) {
-      throw new RuntimeException("Fail to write output", e);
+      // It is not known which method is called first (ipythonClient.stream_execute
+      // or onProcessFailed) when ipython kernel process is exited. Because they are in
+      // 2 different threads. So here we would check ipythonClient's status and sleep 1 second
+      // if ipython kernel is maybe terminated.
+      if (pythonProcessRunning && !ipythonClient.isMaybeIPythonFailed()) {
+        return new InterpreterResult(
+                InterpreterResult.Code.valueOf(response.getStatus().name()));
+      } else {
+        if (ipythonClient.isMaybeIPythonFailed()) {
+          Thread.sleep(1000);
+        }
+        if (pythonProcessRunning) {
+          return new InterpreterResult(
+                  InterpreterResult.Code.valueOf(response.getStatus().name()));
+        } else {
+          return new InterpreterResult(InterpreterResult.Code.ERROR,
+                  "IPython kernel is abnormally exited, please check your code and log.");
+        }
+      }
+    } catch (Exception e) {
+      throw new InterpreterException("Fail to interpret python code", e);
     }
-    InterpreterResult result = new InterpreterResult(
-        InterpreterResult.Code.valueOf(response.getStatus().name()));
-    return result;
   }
 
   @Override
@@ -416,12 +438,13 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
   @Override
   public void onProcessComplete(int exitValue) {
     LOGGER.warn("Python Process is completed with exitValue: " + exitValue);
+    pythonProcessRunning = false;
   }
 
   @Override
   public void onProcessFailed(ExecuteException e) {
     LOGGER.warn("Exception happens in Python Process", e);
-    pythonProcessFailed = true;
+    pythonProcessRunning = false;
   }
 
   static class ProcessLogOutputStream extends LogOutputStream {
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index fb4ba9c..c6770e5 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.python;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Files;
 import com.google.gson.Gson;
 import org.apache.commons.exec.CommandLine;
@@ -160,7 +161,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     pythonScriptRunning.set(true);
   }
 
-
+  @VisibleForTesting
+  public DefaultExecutor getPythonExecutor() {
+    return this.executor;
+  }
 
   private void createPythonScript() throws IOException {
     // set java.io.tmpdir to /tmp on MacOS, because docker can not share the /var folder which will
@@ -348,7 +352,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     }
 
     synchronized (statementFinishedNotifier) {
-      while (statementOutput == null) {
+      while (statementOutput == null && pythonScriptRunning.get()) {
         try {
           statementFinishedNotifier.wait(1000);
         } catch (InterruptedException e) {
@@ -374,7 +378,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
 
     synchronized (pythonScriptInitialized) {
       long startTime = System.currentTimeMillis();
-      while (!pythonScriptInitialized.get()
+      while (!pythonScriptInitialized.get() && pythonScriptRunning.get()
           && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
         try {
           LOGGER.info("Wait for PythonScript initialized");
@@ -417,7 +421,12 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
       } catch (IOException e) {
         throw new InterpreterException(e);
       }
-      return new InterpreterResult(Code.SUCCESS);
+      if (pythonScriptRunning.get()) {
+        return new InterpreterResult(Code.SUCCESS);
+      } else {
+        return new InterpreterResult(Code.ERROR,
+                "Python process is abnormally exited, please check your code and log.");
+      }
     }
   }
 
@@ -590,6 +599,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     LOGGER.info("python process terminated. exit code " + exitValue);
     pythonScriptRunning.set(false);
     pythonScriptInitialized.set(false);
+    synchronized (statementFinishedNotifier) {
+      statementFinishedNotifier.notify();
+    }
   }
 
   @Override
@@ -597,6 +609,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     LOGGER.error("python process failed", e);
     pythonScriptRunning.set(false);
     pythonScriptInitialized.set(false);
+    synchronized (statementFinishedNotifier) {
+      statementFinishedNotifier.notify();
+    }
   }
 
   // Called by Python Process, used for debugging purpose
diff --git a/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java
index a51c053..6e8bbc9 100644
--- a/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.python;
 
+import net.jodah.concurrentunit.ConcurrentTestCase;
 import org.apache.zeppelin.display.ui.CheckBox;
 import org.apache.zeppelin.display.ui.Password;
 import org.apache.zeppelin.display.ui.Select;
@@ -41,7 +42,7 @@ import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
-public abstract class BasePythonInterpreterTest {
+public abstract class BasePythonInterpreterTest extends ConcurrentTestCase {
 
   protected InterpreterGroup intpGroup;
   protected Interpreter interpreter;
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 1e4a709..ca54502 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.zeppelin.python;
 
+import net.jodah.concurrentunit.Waiter;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -31,6 +33,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeoutException;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
@@ -279,4 +282,30 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
   }
 
+  @Test
+  public void testIPythonProcessKilled() throws InterruptedException, TimeoutException {
+    final Waiter waiter = new Waiter();
+    Thread thread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
+                  getInterpreterContext());
+          waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
+          waiter.assertEquals(
+                  "IPython kernel is abnormally exited, please check your code and log.",
+                  result.message().get(0).getData());
+        } catch (InterpreterException e) {
+          waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
+        }
+        waiter.resume();
+      }
+    };
+    thread.start();
+    Thread.sleep(3000);
+    IPythonInterpreter iPythonInterpreter = (IPythonInterpreter)
+            ((LazyOpenInterpreter) interpreter).getInnerInterpreter();
+    iPythonInterpreter.getWatchDog().destroyProcess();
+    waiter.await(3000);
+  }
 }
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
index 8748c00..19d2334 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.zeppelin.python;
 
+import net.jodah.concurrentunit.Waiter;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -28,6 +30,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.Properties;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -38,7 +41,7 @@ import static org.junit.Assert.assertTrue;
 
 
 public class PythonInterpreterTest extends BasePythonInterpreterTest {
-
+  
   @Override
   public void setUp() throws InterpreterException {
 
@@ -50,6 +53,7 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
     properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
 
     interpreter = new LazyOpenInterpreter(new PythonInterpreter(properties));
+
     intpGroup.put("note", new LinkedList<Interpreter>());
     intpGroup.get("note").add(interpreter);
     interpreter.setInterpreterGroup(intpGroup);
@@ -105,4 +109,31 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
     t.join(2000);
     assertFalse(t.isAlive());
   }
+
+  @Test
+  public void testPythonProcessKilled() throws InterruptedException, TimeoutException {
+    final Waiter waiter = new Waiter();
+    Thread thread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
+                  getInterpreterContext());
+          waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
+          waiter.assertEquals(
+                  "Python process is abnormally exited, please check your code and log.",
+                  result.message().get(0).getData());
+        } catch (InterpreterException e) {
+          waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
+        }
+        waiter.resume();
+      }
+    };
+    thread.start();
+    Thread.sleep(3000);
+    PythonInterpreter pythonInterpreter = (PythonInterpreter)
+            ((LazyOpenInterpreter) interpreter).getInnerInterpreter();
+    pythonInterpreter.getPythonExecutor().getWatchdog().destroyProcess();
+    waiter.await(3000);
+  }
 }
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index d7d8418..9a35057 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -379,6 +379,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>net.jodah</groupId>
+      <artifactId>concurrentunit</artifactId>
+      <version>0.4.4</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index 7589895..594c171 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -83,7 +83,8 @@ public class IPySparkInterpreter extends IPythonInterpreter {
   }
 
   @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
+  public InterpreterResult interpret(String st,
+                                     InterpreterContext context) throws InterpreterException {
     InterpreterContext.set(context);
     String jobGroupId = Utils.buildJobGroupId(context);
     String jobDesc = Utils.buildJobDesc(context);