You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/28 01:59:09 UTC
[zeppelin] branch master updated: ZEPPELIN-4081. when the python
process is killed, the task state is still running
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new c9514e2 ZEPPELIN-4081. when the python process is killed,the task state is still running
c9514e2 is described below
commit c9514e26a00b5c2b76c358425bae79fee183c9e1
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Mar 21 11:18:02 2019 +0800
ZEPPELIN-4081. when the python process is killed,the task state is still running
### What is this PR for?
This PR will break python code execution if the python process is existed. Besides that, I also improve the error message for ipython interpreter although it doesn't have such issue.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4081
### How should this be tested?
* Unit test is added
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3338 from zjffdu/ZEPPELIN-4081 and squashes the following commits:
907faacf6 [Jeff Zhang] ZEPPELIN-4081. when the python process is killed,the task state is still running
---
LICENSE | 1 +
python/pom.xml | 8 ++++
.../org/apache/zeppelin/python/IPythonClient.java | 7 +++-
.../apache/zeppelin/python/IPythonInterpreter.java | 49 ++++++++++++++++------
.../apache/zeppelin/python/PythonInterpreter.java | 23 ++++++++--
.../zeppelin/python/BasePythonInterpreterTest.java | 3 +-
.../zeppelin/python/IPythonInterpreterTest.java | 29 +++++++++++++
.../zeppelin/python/PythonInterpreterTest.java | 33 ++++++++++++++-
spark/interpreter/pom.xml | 6 +++
.../apache/zeppelin/spark/IPySparkInterpreter.java | 3 +-
10 files changed, 141 insertions(+), 21 deletions(-)
diff --git a/LICENSE b/LICENSE
index 3b34053..a456a41 100644
--- a/LICENSE
+++ b/LICENSE
@@ -260,6 +260,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
(Apache 2.0) Nimbus JOSE+JWT (https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home)
(Apache 2.0) jarchivelib (https://github.com/thrau/jarchivelib)
(Apache 2.0) Google Cloud Client Library for Java (https://github.com/GoogleCloudPlatform/google-cloud-java)
+ (Apache 2.0) concurrentunit (https://github.com/jhalterman/concurrentunit)
========================================================================
BSD 3-Clause licenses
diff --git a/python/pom.xml b/python/pom.xml
index d9371ed..2700cba 100644
--- a/python/pom.xml
+++ b/python/pom.xml
@@ -87,6 +87,14 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>net.jodah</groupId>
+ <artifactId>concurrentunit</artifactId>
+ <version>0.4.4</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
index b9c897b..c729898 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
@@ -53,6 +53,7 @@ public class IPythonClient {
private final ManagedChannel channel;
private final IPythonGrpc.IPythonBlockingStub blockingStub;
private final IPythonGrpc.IPythonStub asyncStub;
+ private volatile boolean maybeIPythonFailed = false;
private SecureRandom random = new SecureRandom();
@@ -83,6 +84,7 @@ public class IPythonClient {
final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
.setStatus(ExecuteStatus.SUCCESS);
final AtomicBoolean completedFlag = new AtomicBoolean(false);
+ maybeIPythonFailed = false;
LOGGER.debug("stream_execute code:\n" + request.getCode());
asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
int index = 0;
@@ -137,7 +139,7 @@ public class IPythonClient {
}
LOGGER.error("Fail to call IPython grpc", throwable);
finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
-
+ maybeIPythonFailed = true;
completedFlag.set(true);
synchronized (completedFlag) {
completedFlag.notify();
@@ -204,6 +206,9 @@ public class IPythonClient {
asyncStub.stop(request, null);
}
+ public boolean isMaybeIPythonFailed() {
+ return maybeIPythonFailed;
+ }
public static void main(String[] args) {
IPythonClient client = new IPythonClient("localhost", 50053);
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
index 9e23d04..f4c753d 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
@@ -82,7 +82,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
private boolean useBuiltinPy4j = true;
private boolean usePy4JAuth = true;
private String secret;
- private volatile boolean pythonProcessFailed = false;
+ private volatile boolean pythonProcessRunning = false;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
@@ -294,7 +294,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
// wait until IPython kernel is started or timeout
long startTime = System.currentTimeMillis();
- while (!pythonProcessFailed) {
+ while (!pythonProcessRunning) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
@@ -305,6 +305,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
if (response.getStatus() == IPythonStatus.RUNNING) {
LOGGER.info("IPython Kernel is Running");
+ pythonProcessRunning = true;
break;
} else {
LOGGER.info("Wait for IPython Kernel to be started");
@@ -319,7 +320,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
+ " seconds");
}
}
- if (pythonProcessFailed) {
+ if (!pythonProcessRunning) {
throw new IOException("Fail to launch IPython Kernel as the python process is failed");
}
}
@@ -355,23 +356,44 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
}
+ public ExecuteWatchdog getWatchDog() {
+ return watchDog;
+ }
+
@Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
+ public InterpreterResult interpret(String st,
+ InterpreterContext context) throws InterpreterException {
zeppelinContext.setGui(context.getGui());
zeppelinContext.setNoteGui(context.getNoteGui());
zeppelinContext.setInterpreterContext(context);
interpreterOutput.setInterpreterOutput(context.out);
- ExecuteResponse response =
- ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
- interpreterOutput);
try {
+ ExecuteResponse response =
+ ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
+ interpreterOutput);
interpreterOutput.getInterpreterOutput().flush();
- } catch (IOException e) {
- throw new RuntimeException("Fail to write output", e);
+ // It is not known which method is called first (ipythonClient.stream_execute
+ // or onProcessFailed) when ipython kernel process is exited. Because they are in
+ // 2 different threads. So here we would check ipythonClient's status and sleep 1 second
+ // if ipython kernel is maybe terminated.
+ if (pythonProcessRunning && !ipythonClient.isMaybeIPythonFailed()) {
+ return new InterpreterResult(
+ InterpreterResult.Code.valueOf(response.getStatus().name()));
+ } else {
+ if (ipythonClient.isMaybeIPythonFailed()) {
+ Thread.sleep(1000);
+ }
+ if (pythonProcessRunning) {
+ return new InterpreterResult(
+ InterpreterResult.Code.valueOf(response.getStatus().name()));
+ } else {
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ "IPython kernel is abnormally exited, please check your code and log.");
+ }
+ }
+ } catch (Exception e) {
+ throw new InterpreterException("Fail to interpret python code", e);
}
- InterpreterResult result = new InterpreterResult(
- InterpreterResult.Code.valueOf(response.getStatus().name()));
- return result;
}
@Override
@@ -416,12 +438,13 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
@Override
public void onProcessComplete(int exitValue) {
LOGGER.warn("Python Process is completed with exitValue: " + exitValue);
+ pythonProcessRunning = false;
}
@Override
public void onProcessFailed(ExecuteException e) {
LOGGER.warn("Exception happens in Python Process", e);
- pythonProcessFailed = true;
+ pythonProcessRunning = false;
}
static class ProcessLogOutputStream extends LogOutputStream {
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index fb4ba9c..c6770e5 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.python;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import com.google.gson.Gson;
import org.apache.commons.exec.CommandLine;
@@ -160,7 +161,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
pythonScriptRunning.set(true);
}
-
+ @VisibleForTesting
+ public DefaultExecutor getPythonExecutor() {
+ return this.executor;
+ }
private void createPythonScript() throws IOException {
// set java.io.tmpdir to /tmp on MacOS, because docker can not share the /var folder which will
@@ -348,7 +352,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
synchronized (statementFinishedNotifier) {
- while (statementOutput == null) {
+ while (statementOutput == null && pythonScriptRunning.get()) {
try {
statementFinishedNotifier.wait(1000);
} catch (InterruptedException e) {
@@ -374,7 +378,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
synchronized (pythonScriptInitialized) {
long startTime = System.currentTimeMillis();
- while (!pythonScriptInitialized.get()
+ while (!pythonScriptInitialized.get() && pythonScriptRunning.get()
&& System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
try {
LOGGER.info("Wait for PythonScript initialized");
@@ -417,7 +421,12 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
} catch (IOException e) {
throw new InterpreterException(e);
}
- return new InterpreterResult(Code.SUCCESS);
+ if (pythonScriptRunning.get()) {
+ return new InterpreterResult(Code.SUCCESS);
+ } else {
+ return new InterpreterResult(Code.ERROR,
+ "Python process is abnormally exited, please check your code and log.");
+ }
}
}
@@ -590,6 +599,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
LOGGER.info("python process terminated. exit code " + exitValue);
pythonScriptRunning.set(false);
pythonScriptInitialized.set(false);
+ synchronized (statementFinishedNotifier) {
+ statementFinishedNotifier.notify();
+ }
}
@Override
@@ -597,6 +609,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
LOGGER.error("python process failed", e);
pythonScriptRunning.set(false);
pythonScriptInitialized.set(false);
+ synchronized (statementFinishedNotifier) {
+ statementFinishedNotifier.notify();
+ }
}
// Called by Python Process, used for debugging purpose
diff --git a/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java
index a51c053..6e8bbc9 100644
--- a/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.python;
+import net.jodah.concurrentunit.ConcurrentTestCase;
import org.apache.zeppelin.display.ui.CheckBox;
import org.apache.zeppelin.display.ui.Password;
import org.apache.zeppelin.display.ui.Select;
@@ -41,7 +42,7 @@ import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
-public abstract class BasePythonInterpreterTest {
+public abstract class BasePythonInterpreterTest extends ConcurrentTestCase {
protected InterpreterGroup intpGroup;
protected Interpreter interpreter;
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 1e4a709..ca54502 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -17,6 +17,8 @@
package org.apache.zeppelin.python;
+import net.jodah.concurrentunit.Waiter;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -31,6 +33,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeoutException;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
@@ -279,4 +282,30 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
+ @Test
+ public void testIPythonProcessKilled() throws InterruptedException, TimeoutException {
+ final Waiter waiter = new Waiter();
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
+ getInterpreterContext());
+ waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
+ waiter.assertEquals(
+ "IPython kernel is abnormally exited, please check your code and log.",
+ result.message().get(0).getData());
+ } catch (InterpreterException e) {
+ waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
+ }
+ waiter.resume();
+ }
+ };
+ thread.start();
+ Thread.sleep(3000);
+ IPythonInterpreter iPythonInterpreter = (IPythonInterpreter)
+ ((LazyOpenInterpreter) interpreter).getInnerInterpreter();
+ iPythonInterpreter.getWatchDog().destroyProcess();
+ waiter.await(3000);
+ }
}
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
index 8748c00..19d2334 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
@@ -17,6 +17,8 @@
package org.apache.zeppelin.python;
+import net.jodah.concurrentunit.Waiter;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -28,6 +30,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Properties;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -38,7 +41,7 @@ import static org.junit.Assert.assertTrue;
public class PythonInterpreterTest extends BasePythonInterpreterTest {
-
+
@Override
public void setUp() throws InterpreterException {
@@ -50,6 +53,7 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
interpreter = new LazyOpenInterpreter(new PythonInterpreter(properties));
+
intpGroup.put("note", new LinkedList<Interpreter>());
intpGroup.get("note").add(interpreter);
interpreter.setInterpreterGroup(intpGroup);
@@ -105,4 +109,31 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
t.join(2000);
assertFalse(t.isAlive());
}
+
+ @Test
+ public void testPythonProcessKilled() throws InterruptedException, TimeoutException {
+ final Waiter waiter = new Waiter();
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
+ getInterpreterContext());
+ waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
+ waiter.assertEquals(
+ "Python process is abnormally exited, please check your code and log.",
+ result.message().get(0).getData());
+ } catch (InterpreterException e) {
+ waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
+ }
+ waiter.resume();
+ }
+ };
+ thread.start();
+ Thread.sleep(3000);
+ PythonInterpreter pythonInterpreter = (PythonInterpreter)
+ ((LazyOpenInterpreter) interpreter).getInnerInterpreter();
+ pythonInterpreter.getPythonExecutor().getWatchdog().destroyProcess();
+ waiter.await(3000);
+ }
}
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index d7d8418..9a35057 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -379,6 +379,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>net.jodah</groupId>
+ <artifactId>concurrentunit</artifactId>
+ <version>0.4.4</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index 7589895..594c171 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -83,7 +83,8 @@ public class IPySparkInterpreter extends IPythonInterpreter {
}
@Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
+ public InterpreterResult interpret(String st,
+ InterpreterContext context) throws InterpreterException {
InterpreterContext.set(context);
String jobGroupId = Utils.buildJobGroupId(context);
String jobDesc = Utils.buildJobDesc(context);