You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/17 03:03:50 UTC

zeppelin git commit: ZEPPELIN-3538. Fail to bootstrap PySpark in yarn cluster mode

Repository: zeppelin
Updated Branches:
  refs/heads/master 50fb42f18 -> 5c7d65b18


ZEPPELIN-3538. Fail to bootstrap PySpark in yarn cluster mode

### What is this PR for?
After ZEPPELIN-3375, PySparkInterpreter extends PythonInterpreter. PySparkInterpreter may fail to bootstrap PySpark as we miss to add hook to the pyspark bootstrap code.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3538

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #3016 from zjffdu/ZEPPELIN-3538 and squashes the following commits:

3a1d8a737 [Jeff Zhang] ZEPPELIN-3538. Fail to bootstrap PySpark in yarn cluster mode


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5c7d65b1
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5c7d65b1
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5c7d65b1

Branch: refs/heads/master
Commit: 5c7d65b18cdef29a96e36075ee02b7e527df7702
Parents: 50fb42f
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Jun 13 10:14:14 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Sun Jun 17 11:03:43 2018 +0800

----------------------------------------------------------------------
 .../apache/zeppelin/python/PythonInterpreter.java   | 16 ++++++++++++++--
 python/src/main/resources/python/zeppelin_python.py | 11 ++++++++---
 .../apache/zeppelin/spark/PySparkInterpreter.java   |  6 +++---
 .../apache/zeppelin/interpreter/MiniZeppelin.java   |  1 +
 .../src/test/resources/log4j.properties             |  2 ++
 5 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index a5c6d5c..7e8ebc1 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -66,7 +66,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreter.class);
-  private static final int MAX_TIMEOUT_SEC = 10;
+  private static final int MAX_TIMEOUT_SEC = 30;
 
   private GatewayServer gatewayServer;
   private DefaultExecutor executor;
@@ -291,10 +291,16 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
   public class PythonInterpretRequest {
     public String statements;
     public boolean isForCompletion;
+    public boolean isCallHooks;
 
     public PythonInterpretRequest(String statements, boolean isForCompletion) {
+      this(statements, isForCompletion, true);
+    }
+
+    public PythonInterpretRequest(String statements, boolean isForCompletion, boolean isCallHooks) {
       this.statements = statements;
       this.isForCompletion = isForCompletion;
+      this.isCallHooks = isCallHooks;
     }
 
     public String statements() {
@@ -304,6 +310,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     public boolean isForCompletion() {
       return isForCompletion;
     }
+
+    public boolean isCallHooks() {
+      return isCallHooks;
+    }
   }
 
   // called by Python Process
@@ -599,7 +609,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
     String bootstrapCode =
         IOUtils.toString(getClass().getClassLoader().getResourceAsStream(resourceName));
     try {
-      InterpreterResult result = interpret(bootstrapCode, InterpreterContext.get());
+      // Add hook explicitly, otherwise python will fail to execute the statement
+      InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()",
+          InterpreterContext.get());
       if (result.code() != Code.SUCCESS) {
         throw new IOException("Fail to run bootstrap script: " + resourceName);
       }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/python/src/main/resources/python/zeppelin_python.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py
index 19fa220..5ad16a4 100644
--- a/python/src/main/resources/python/zeppelin_python.py
+++ b/python/src/main/resources/python/zeppelin_python.py
@@ -111,12 +111,18 @@ while True :
 
     # Get post-execute hooks
     try:
-      global_hook = intp.getHook('post_exec_dev')
+      if req.isCallHooks():
+        global_hook = intp.getHook('post_exec_dev')
+      else:
+        global_hook = None
     except:
       global_hook = None
 
     try:
-      user_hook = __zeppelin__.getHook('post_exec')
+      if req.isCallHooks():
+        user_hook = __zeppelin__.getHook('post_exec')
+      else:
+        user_hook = None
     except:
       user_hook = None
 
@@ -133,7 +139,6 @@ while True :
       to_run_hooks = []
       if (nhooks > 0):
         to_run_hooks = code.body[-nhooks:]
-
       to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
                                     [code.body[-(nhooks + 1)]])
       try:

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 1df6e2e..f3fee21 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -130,11 +130,11 @@ public class PySparkInterpreter extends PythonInterpreter {
     try {
       URLClassLoader newCl = new URLClassLoader(urls, oldCl);
       Thread.currentThread().setContextClassLoader(newCl);
-      // create Python Process and JVM gateway
-      super.open();
       // must create spark interpreter after ClassLoader is set, otherwise the additional jars
       // can not be loaded by spark repl.
       this.sparkInterpreter = getSparkInterpreter();
+      // create Python Process and JVM gateway
+      super.open();
     } finally {
       Thread.currentThread().setContextClassLoader(oldCl);
     }
@@ -175,7 +175,7 @@ public class PySparkInterpreter extends PythonInterpreter {
     String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
     callPython(new PythonInterpretRequest(
         String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc),
-        false));
+        false, false));
   }
 
   // Run python shell

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
index 923ae5a..14d0166 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
@@ -38,6 +38,7 @@ public class MiniZeppelin {
     FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties"));
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "60000");
     conf = new ZeppelinConfiguration();
     interpreterSettingManager = new InterpreterSettingManager(conf,
         mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/zeppelin-zengine/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/log4j.properties b/zeppelin-zengine/src/test/resources/log4j.properties
index e45936e..c8e4342 100644
--- a/zeppelin-zengine/src/test/resources/log4j.properties
+++ b/zeppelin-zengine/src/test/resources/log4j.properties
@@ -44,3 +44,5 @@ log4j.logger.org.hibernate.type=ALL
 
 log4j.logger.org.apache.hadoop=WARN
 log4j.logger.org.apache.zeppelin.plugin=DEBUG
+log4j.logger.org.apache.zeppelin.spark=DEBUG
+log4j.logger.org.apache.zeppelin.python=DEBUG
\ No newline at end of file