You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/03/14 06:32:47 UTC

zeppelin git commit: [ZEPPELIN-2075] Can't stop infinite `while` statement in pyspark Interpreter.

Repository: zeppelin
Updated Branches:
  refs/heads/master 5d2ce181d -> 9f22db91c


[ZEPPELIN-2075] Can't stop infinite `while` statement in pyspark Interpreter.

### What is this PR for?
If following code runs with Pyspark Interpreter, there is no way to cancel except Zeppelin Server restart.
```
%spark.pyspark
import time

while True:
    time.sleep(1)
    print("running..")
```

### What type of PR is it?
Bug Fix | Improvement

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-2075

### How should this be tested?
Run above code with Pyspark Interpreter and try to cancel.

### Screenshots (if appropriate)
- before
![pyspark before](https://cloud.githubusercontent.com/assets/3348133/22696141/615c1206-ed90-11e6-9bbb-339ecdec73fc.gif)

- after
![pyspark after](https://cloud.githubusercontent.com/assets/3348133/22696168/70899172-ed90-11e6-99e1-342eb4094b2c.gif)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: astroshim <hs...@zepl.com>

Closes #1985 from astroshim/ZEPPELIN-2075 and squashes the following commits:

84bf09a [astroshim] fix testcase
bc12eaa [astroshim] pass pid to java
b60d89a [astroshim] Merge branch 'master' into ZEPPELIN-2075
f26eacf [astroshim] add test-case for canceling.
c0cac4e [astroshim] fix logging
678c183 [astroshim] remove signal handler
65d8cc6 [astroshim] init python pid variable
6731e56 [astroshim] add signal to cancel job


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/9f22db91
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/9f22db91
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/9f22db91

Branch: refs/heads/master
Commit: 9f22db91c279b7daf6a13b2d805a874074b070fd
Parents: 5d2ce18
Author: astroshim <hs...@zepl.com>
Authored: Sun Feb 19 00:36:45 2017 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Tue Mar 14 15:32:40 2017 +0900

----------------------------------------------------------------------
 .../zeppelin/spark/PySparkInterpreter.java      | 20 ++++++++++++-
 .../main/resources/python/zeppelin_pyspark.py   |  2 +-
 .../zeppelin/spark/PySparkInterpreterTest.java  | 30 ++++++++++++++++++++
 3 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9f22db91/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index b7dc67d..db52a53 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -73,10 +73,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
   private String scriptPath;
   boolean pythonscriptRunning = false;
   private static final int MAX_TIMEOUT_SEC = 10;
+  private long pythonPid;
 
   public PySparkInterpreter(Properties property) {
     super(property);
 
+    pythonPid = -1;
     try {
       File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py");
       scriptPath = scriptFile.getAbsolutePath();
@@ -319,7 +321,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
   boolean pythonScriptInitialized = false;
   Integer pythonScriptInitializeNotifier = new Integer(0);
 
-  public void onPythonScriptInitialized() {
+  public void onPythonScriptInitialized(long pid) {
+    pythonPid = pid;
     synchronized (pythonScriptInitializeNotifier) {
       pythonScriptInitialized = true;
       pythonScriptInitializeNotifier.notifyAll();
@@ -420,10 +423,25 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     }
   }
 
+  public void interrupt() throws IOException {
+    if (pythonPid > -1) {
+      logger.info("Sending SIGINT signal to PID : " + pythonPid);
+      Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
+    } else {
+      logger.warn("Non UNIX/Linux system, close the interpreter");
+      close();
+    }
+  }
+
   @Override
   public void cancel(InterpreterContext context) {
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
     sparkInterpreter.cancel(context);
+    try {
+      interrupt();
+    } catch (IOException e) {
+      logger.error("Error", e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9f22db91/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index c59d2f4..d9c68c2 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -252,7 +252,7 @@ java_import(gateway.jvm, "org.apache.spark.api.python.*")
 java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
 
 intp = gateway.entry_point
-intp.onPythonScriptInitialized()
+intp.onPythonScriptInitialized(os.getpid())
 
 jsc = intp.getJavaSparkContext()
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9f22db91/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index 60e40d7..3697512 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -33,6 +33,8 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.junit.Assert.*;
 
@@ -120,4 +122,32 @@ public class PySparkInterpreterTest {
       assertTrue(completions.size() > 0);
     }
   }
+
+  private class infinityPythonJob implements Runnable {
+    @Override
+    public void run() {
+      String code = "import time\nwhile True:\n  time.sleep(1)" ;
+      InterpreterResult ret = pySparkInterpreter.interpret(code, context);
+      assertNotNull(ret);
+      Pattern expectedMessage = Pattern.compile("KeyboardInterrupt");
+      Matcher m = expectedMessage.matcher(ret.message().toString());
+      assertTrue(m.find());
+    }
+  }
+
+  @Test
+  public void testCancelIntp() throws InterruptedException {
+    if (getSparkVersionNumber() > 11) {
+      assertEquals(InterpreterResult.Code.SUCCESS,
+        pySparkInterpreter.interpret("a = 1\n", context).code());
+
+      Thread t = new Thread(new infinityPythonJob());
+      t.start();
+      Thread.sleep(5000);
+      pySparkInterpreter.cancel(context);
+      assertTrue(t.isAlive());
+      t.join(2000);
+      assertFalse(t.isAlive());
+    }
+  }
 }