You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/30 13:18:30 UTC

[zeppelin] 02/02: Revert "ZEPPELIN-4081. when the python process is killed, the task state is still running"

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git

commit 06f09cf3ebba677f2115ba4cca554c8860cff0a9
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Mar 29 22:26:50 2019 +0800

    Revert "ZEPPELIN-4081. when the python process is killed,the task state is still running"
    
    This reverts commit c9514e26a00b5c2b76c358425bae79fee183c9e1.
---
 LICENSE                                            |  1 -
 python/pom.xml                                     |  8 ----
 .../org/apache/zeppelin/python/IPythonClient.java  |  7 +---
 .../apache/zeppelin/python/IPythonInterpreter.java | 49 ++++++----------------
 .../apache/zeppelin/python/PythonInterpreter.java  | 23 ++--------
 .../zeppelin/python/BasePythonInterpreterTest.java |  3 +-
 .../zeppelin/python/IPythonInterpreterTest.java    | 29 -------------
 .../zeppelin/python/PythonInterpreterTest.java     | 33 +--------------
 spark/interpreter/pom.xml                          |  6 ---
 .../apache/zeppelin/spark/IPySparkInterpreter.java |  3 +-
 10 files changed, 21 insertions(+), 141 deletions(-)

diff --git a/LICENSE b/LICENSE
index a456a41..3b34053 100644
--- a/LICENSE
+++ b/LICENSE
@@ -260,7 +260,6 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
     (Apache 2.0) Nimbus JOSE+JWT (https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home)
     (Apache 2.0) jarchivelib (https://github.com/thrau/jarchivelib)
     (Apache 2.0) Google Cloud Client Library for Java (https://github.com/GoogleCloudPlatform/google-cloud-java)
-    (Apache 2.0) concurrentunit (https://github.com/jhalterman/concurrentunit)
 
 ========================================================================
 BSD 3-Clause licenses
diff --git a/python/pom.xml b/python/pom.xml
index 2700cba..d9371ed 100644
--- a/python/pom.xml
+++ b/python/pom.xml
@@ -87,14 +87,6 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
-
-    <dependency>
-      <groupId>net.jodah</groupId>
-      <artifactId>concurrentunit</artifactId>
-      <version>0.4.4</version>
-      <scope>test</scope>
-    </dependency>
-
   </dependencies>
 
   <build>
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
index c729898..b9c897b 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java
@@ -53,7 +53,6 @@ public class IPythonClient {
   private final ManagedChannel channel;
   private final IPythonGrpc.IPythonBlockingStub blockingStub;
   private final IPythonGrpc.IPythonStub asyncStub;
-  private volatile boolean maybeIPythonFailed = false;
 
   private SecureRandom random = new SecureRandom();
 
@@ -84,7 +83,6 @@ public class IPythonClient {
     final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder()
         .setStatus(ExecuteStatus.SUCCESS);
     final AtomicBoolean completedFlag = new AtomicBoolean(false);
-    maybeIPythonFailed = false;
     LOGGER.debug("stream_execute code:\n" + request.getCode());
     asyncStub.execute(request, new StreamObserver<ExecuteResponse>() {
       int index = 0;
@@ -139,7 +137,7 @@ public class IPythonClient {
         }
         LOGGER.error("Fail to call IPython grpc", throwable);
         finalResponseBuilder.setStatus(ExecuteStatus.ERROR);
-        maybeIPythonFailed = true;
+
         completedFlag.set(true);
         synchronized (completedFlag) {
           completedFlag.notify();
@@ -206,9 +204,6 @@ public class IPythonClient {
     asyncStub.stop(request, null);
   }
 
-  public boolean isMaybeIPythonFailed() {
-    return maybeIPythonFailed;
-  }
 
   public static void main(String[] args) {
     IPythonClient client = new IPythonClient("localhost", 50053);
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
index f4c753d..9e23d04 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
@@ -82,7 +82,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
   private boolean useBuiltinPy4j = true;
   private boolean usePy4JAuth = true;
   private String secret;
-  private volatile boolean pythonProcessRunning = false;
+  private volatile boolean pythonProcessFailed = false;
 
   private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
 
@@ -294,7 +294,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
 
     // wait until IPython kernel is started or timeout
     long startTime = System.currentTimeMillis();
-    while (!pythonProcessRunning) {
+    while (!pythonProcessFailed) {
       try {
         Thread.sleep(100);
       } catch (InterruptedException e) {
@@ -305,7 +305,6 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
         StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build());
         if (response.getStatus() == IPythonStatus.RUNNING) {
           LOGGER.info("IPython Kernel is Running");
-          pythonProcessRunning = true;
           break;
         } else {
           LOGGER.info("Wait for IPython Kernel to be started");
@@ -320,7 +319,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
             + " seconds");
       }
     }
-    if (!pythonProcessRunning) {
+    if (pythonProcessFailed) {
       throw new IOException("Fail to launch IPython Kernel as the python process is failed");
     }
   }
@@ -356,44 +355,23 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
     }
   }
 
-  public ExecuteWatchdog getWatchDog() {
-    return watchDog;
-  }
-
   @Override
-  public InterpreterResult interpret(String st,
-                                     InterpreterContext context) throws InterpreterException {
+  public InterpreterResult interpret(String st, InterpreterContext context) {
     zeppelinContext.setGui(context.getGui());
     zeppelinContext.setNoteGui(context.getNoteGui());
     zeppelinContext.setInterpreterContext(context);
     interpreterOutput.setInterpreterOutput(context.out);
+    ExecuteResponse response =
+        ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
+            interpreterOutput);
     try {
-      ExecuteResponse response =
-              ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
-                      interpreterOutput);
       interpreterOutput.getInterpreterOutput().flush();
-      // It is not known which method is called first (ipythonClient.stream_execute
-      // or onProcessFailed) when ipython kernel process is exited. Because they are in
-      // 2 different threads. So here we would check ipythonClient's status and sleep 1 second
-      // if ipython kernel is maybe terminated.
-      if (pythonProcessRunning && !ipythonClient.isMaybeIPythonFailed()) {
-        return new InterpreterResult(
-                InterpreterResult.Code.valueOf(response.getStatus().name()));
-      } else {
-        if (ipythonClient.isMaybeIPythonFailed()) {
-          Thread.sleep(1000);
-        }
-        if (pythonProcessRunning) {
-          return new InterpreterResult(
-                  InterpreterResult.Code.valueOf(response.getStatus().name()));
-        } else {
-          return new InterpreterResult(InterpreterResult.Code.ERROR,
-                  "IPython kernel is abnormally exited, please check your code and log.");
-        }
-      }
-    } catch (Exception e) {
-      throw new InterpreterException("Fail to interpret python code", e);
+    } catch (IOException e) {
+      throw new RuntimeException("Fail to write output", e);
     }
+    InterpreterResult result = new InterpreterResult(
+        InterpreterResult.Code.valueOf(response.getStatus().name()));
+    return result;
   }
 
   @Override
@@ -438,13 +416,12 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
   @Override
   public void onProcessComplete(int exitValue) {
     LOGGER.warn("Python Process is completed with exitValue: " + exitValue);
-    pythonProcessRunning = false;
   }
 
   @Override
   public void onProcessFailed(ExecuteException e) {
     LOGGER.warn("Exception happens in Python Process", e);
-    pythonProcessRunning = false;
+    pythonProcessFailed = true;
   }
 
   static class ProcessLogOutputStream extends LogOutputStream {
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index c6770e5..fb4ba9c 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.python;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Files;
 import com.google.gson.Gson;
 import org.apache.commons.exec.CommandLine;
@@ -161,10 +160,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     pythonScriptRunning.set(true);
   }
 
-  @VisibleForTesting
-  public DefaultExecutor getPythonExecutor() {
-    return this.executor;
-  }
+
 
   private void createPythonScript() throws IOException {
     // set java.io.tmpdir to /tmp on MacOS, because docker can not share the /var folder which will
@@ -352,7 +348,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     }
 
     synchronized (statementFinishedNotifier) {
-      while (statementOutput == null && pythonScriptRunning.get()) {
+      while (statementOutput == null) {
         try {
           statementFinishedNotifier.wait(1000);
         } catch (InterruptedException e) {
@@ -378,7 +374,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
 
     synchronized (pythonScriptInitialized) {
       long startTime = System.currentTimeMillis();
-      while (!pythonScriptInitialized.get() && pythonScriptRunning.get()
+      while (!pythonScriptInitialized.get()
           && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
         try {
           LOGGER.info("Wait for PythonScript initialized");
@@ -421,12 +417,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
       } catch (IOException e) {
         throw new InterpreterException(e);
       }
-      if (pythonScriptRunning.get()) {
-        return new InterpreterResult(Code.SUCCESS);
-      } else {
-        return new InterpreterResult(Code.ERROR,
-                "Python process is abnormally exited, please check your code and log.");
-      }
+      return new InterpreterResult(Code.SUCCESS);
     }
   }
 
@@ -599,9 +590,6 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     LOGGER.info("python process terminated. exit code " + exitValue);
     pythonScriptRunning.set(false);
     pythonScriptInitialized.set(false);
-    synchronized (statementFinishedNotifier) {
-      statementFinishedNotifier.notify();
-    }
   }
 
   @Override
@@ -609,9 +597,6 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     LOGGER.error("python process failed", e);
     pythonScriptRunning.set(false);
     pythonScriptInitialized.set(false);
-    synchronized (statementFinishedNotifier) {
-      statementFinishedNotifier.notify();
-    }
   }
 
   // Called by Python Process, used for debugging purpose
diff --git a/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java
index 6e8bbc9..a51c053 100644
--- a/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.python;
 
-import net.jodah.concurrentunit.ConcurrentTestCase;
 import org.apache.zeppelin.display.ui.CheckBox;
 import org.apache.zeppelin.display.ui.Password;
 import org.apache.zeppelin.display.ui.Select;
@@ -42,7 +41,7 @@ import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
-public abstract class BasePythonInterpreterTest extends ConcurrentTestCase {
+public abstract class BasePythonInterpreterTest {
 
   protected InterpreterGroup intpGroup;
   protected Interpreter interpreter;
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 9eba8d8..28e6270 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.zeppelin.python;
 
-import net.jodah.concurrentunit.Waiter;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -32,7 +30,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.TimeoutException;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
@@ -238,30 +235,4 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
   }
 
-  @Test
-  public void testIPythonProcessKilled() throws InterruptedException, TimeoutException {
-    final Waiter waiter = new Waiter();
-    Thread thread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
-                  getInterpreterContext());
-          waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
-          waiter.assertEquals(
-                  "IPython kernel is abnormally exited, please check your code and log.",
-                  result.message().get(0).getData());
-        } catch (InterpreterException e) {
-          waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
-        }
-        waiter.resume();
-      }
-    };
-    thread.start();
-    Thread.sleep(3000);
-    IPythonInterpreter iPythonInterpreter = (IPythonInterpreter)
-            ((LazyOpenInterpreter) interpreter).getInnerInterpreter();
-    iPythonInterpreter.getWatchDog().destroyProcess();
-    waiter.await(3000);
-  }
 }
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
index 19d2334..8748c00 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.zeppelin.python;
 
-import net.jodah.concurrentunit.Waiter;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -30,7 +28,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.Properties;
-import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -41,7 +38,7 @@ import static org.junit.Assert.assertTrue;
 
 
 public class PythonInterpreterTest extends BasePythonInterpreterTest {
-  
+
   @Override
   public void setUp() throws InterpreterException {
 
@@ -53,7 +50,6 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
     properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
 
     interpreter = new LazyOpenInterpreter(new PythonInterpreter(properties));
-
     intpGroup.put("note", new LinkedList<Interpreter>());
     intpGroup.get("note").add(interpreter);
     interpreter.setInterpreterGroup(intpGroup);
@@ -109,31 +105,4 @@ public class PythonInterpreterTest extends BasePythonInterpreterTest {
     t.join(2000);
     assertFalse(t.isAlive());
   }
-
-  @Test
-  public void testPythonProcessKilled() throws InterruptedException, TimeoutException {
-    final Waiter waiter = new Waiter();
-    Thread thread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          InterpreterResult result = interpreter.interpret("import time\ntime.sleep(1000)",
-                  getInterpreterContext());
-          waiter.assertEquals(InterpreterResult.Code.ERROR, result.code());
-          waiter.assertEquals(
-                  "Python process is abnormally exited, please check your code and log.",
-                  result.message().get(0).getData());
-        } catch (InterpreterException e) {
-          waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
-        }
-        waiter.resume();
-      }
-    };
-    thread.start();
-    Thread.sleep(3000);
-    PythonInterpreter pythonInterpreter = (PythonInterpreter)
-            ((LazyOpenInterpreter) interpreter).getInnerInterpreter();
-    pythonInterpreter.getPythonExecutor().getWatchdog().destroyProcess();
-    waiter.await(3000);
-  }
 }
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index 9a35057..d7d8418 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -379,12 +379,6 @@
       <scope>test</scope>
     </dependency>
 
-    <dependency>
-      <groupId>net.jodah</groupId>
-      <artifactId>concurrentunit</artifactId>
-      <version>0.4.4</version>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index 594c171..7589895 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -83,8 +83,7 @@ public class IPySparkInterpreter extends IPythonInterpreter {
   }
 
   @Override
-  public InterpreterResult interpret(String st,
-                                     InterpreterContext context) throws InterpreterException {
+  public InterpreterResult interpret(String st, InterpreterContext context) {
     InterpreterContext.set(context);
     String jobGroupId = Utils.buildJobGroupId(context);
     String jobDesc = Utils.buildJobDesc(context);