You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/03/14 06:32:47 UTC
zeppelin git commit: [ZEPPELIN-2075] Can't stop infinite `while`
statement in pyspark Interpreter.
Repository: zeppelin
Updated Branches:
refs/heads/master 5d2ce181d -> 9f22db91c
[ZEPPELIN-2075] Can't stop infinite `while` statement in pyspark Interpreter.
### What is this PR for?
If following code runs with Pyspark Interpreter, there is no way to cancel except Zeppelin Server restart.
```
%spark.pyspark
import time
while True:
time.sleep(1)
print("running..")
```
### What type of PR is it?
Bug Fix | Improvement
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-2075
### How should this be tested?
Run above code with Pyspark Interpreter and try to cancel.
### Screenshots (if appropriate)
- before
![pyspark before](https://cloud.githubusercontent.com/assets/3348133/22696141/615c1206-ed90-11e6-9bbb-339ecdec73fc.gif)
- after
![pyspark after](https://cloud.githubusercontent.com/assets/3348133/22696168/70899172-ed90-11e6-99e1-342eb4094b2c.gif)
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: astroshim <hs...@zepl.com>
Closes #1985 from astroshim/ZEPPELIN-2075 and squashes the following commits:
84bf09a [astroshim] fix testcase
bc12eaa [astroshim] pass pid to java
b60d89a [astroshim] Merge branch 'master' into ZEPPELIN-2075
f26eacf [astroshim] add test-case for canceling.
c0cac4e [astroshim] fix logging
678c183 [astroshim] remove signal handler
65d8cc6 [astroshim] init python pid variable
6731e56 [astroshim] add signal to cancel job
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/9f22db91
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/9f22db91
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/9f22db91
Branch: refs/heads/master
Commit: 9f22db91c279b7daf6a13b2d805a874074b070fd
Parents: 5d2ce18
Author: astroshim <hs...@zepl.com>
Authored: Sun Feb 19 00:36:45 2017 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Tue Mar 14 15:32:40 2017 +0900
----------------------------------------------------------------------
.../zeppelin/spark/PySparkInterpreter.java | 20 ++++++++++++-
.../main/resources/python/zeppelin_pyspark.py | 2 +-
.../zeppelin/spark/PySparkInterpreterTest.java | 30 ++++++++++++++++++++
3 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9f22db91/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index b7dc67d..db52a53 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -73,10 +73,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
private String scriptPath;
boolean pythonscriptRunning = false;
private static final int MAX_TIMEOUT_SEC = 10;
+ private long pythonPid;
public PySparkInterpreter(Properties property) {
super(property);
+ pythonPid = -1;
try {
File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py");
scriptPath = scriptFile.getAbsolutePath();
@@ -319,7 +321,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
boolean pythonScriptInitialized = false;
Integer pythonScriptInitializeNotifier = new Integer(0);
- public void onPythonScriptInitialized() {
+ public void onPythonScriptInitialized(long pid) {
+ pythonPid = pid;
synchronized (pythonScriptInitializeNotifier) {
pythonScriptInitialized = true;
pythonScriptInitializeNotifier.notifyAll();
@@ -420,10 +423,25 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
+ public void interrupt() throws IOException {
+ if (pythonPid > -1) {
+ logger.info("Sending SIGINT signal to PID : " + pythonPid);
+ Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
+ } else {
+ logger.warn("Non UNIX/Linux system, close the interpreter");
+ close();
+ }
+ }
+
@Override
public void cancel(InterpreterContext context) {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
sparkInterpreter.cancel(context);
+ try {
+ interrupt();
+ } catch (IOException e) {
+ logger.error("Error", e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9f22db91/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index c59d2f4..d9c68c2 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -252,7 +252,7 @@ java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
intp = gateway.entry_point
-intp.onPythonScriptInitialized()
+intp.onPythonScriptInitialized(os.getpid())
jsc = intp.getJavaSparkContext()
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9f22db91/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index 60e40d7..3697512 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -33,6 +33,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static org.junit.Assert.*;
@@ -120,4 +122,32 @@ public class PySparkInterpreterTest {
assertTrue(completions.size() > 0);
}
}
+
+ private class infinityPythonJob implements Runnable {
+ @Override
+ public void run() {
+ String code = "import time\nwhile True:\n time.sleep(1)" ;
+ InterpreterResult ret = pySparkInterpreter.interpret(code, context);
+ assertNotNull(ret);
+ Pattern expectedMessage = Pattern.compile("KeyboardInterrupt");
+ Matcher m = expectedMessage.matcher(ret.message().toString());
+ assertTrue(m.find());
+ }
+ }
+
+ @Test
+ public void testCancelIntp() throws InterruptedException {
+ if (getSparkVersionNumber() > 11) {
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ pySparkInterpreter.interpret("a = 1\n", context).code());
+
+ Thread t = new Thread(new infinityPythonJob());
+ t.start();
+ Thread.sleep(5000);
+ pySparkInterpreter.cancel(context);
+ assertTrue(t.isAlive());
+ t.join(2000);
+ assertFalse(t.isAlive());
+ }
+ }
}