You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/17 03:03:50 UTC
zeppelin git commit: ZEPPELIN-3538. Fail to bootstrap PySpark in yarn
cluster mode
Repository: zeppelin
Updated Branches:
refs/heads/master 50fb42f18 -> 5c7d65b18
ZEPPELIN-3538. Fail to bootstrap PySpark in yarn cluster mode
### What is this PR for?
After ZEPPELIN-3375, PySparkInterpreter extends PythonInterpreter. PySparkInterpreter may fail to bootstrap PySpark as we miss to add hook to the pyspark bootstrap code.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3538
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3016 from zjffdu/ZEPPELIN-3538 and squashes the following commits:
3a1d8a737 [Jeff Zhang] ZEPPELIN-3538. Fail to bootstrap PySpark in yarn cluster mode
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5c7d65b1
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5c7d65b1
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5c7d65b1
Branch: refs/heads/master
Commit: 5c7d65b18cdef29a96e36075ee02b7e527df7702
Parents: 50fb42f
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Jun 13 10:14:14 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Sun Jun 17 11:03:43 2018 +0800
----------------------------------------------------------------------
.../apache/zeppelin/python/PythonInterpreter.java | 16 ++++++++++++++--
python/src/main/resources/python/zeppelin_python.py | 11 ++++++++---
.../apache/zeppelin/spark/PySparkInterpreter.java | 6 +++---
.../apache/zeppelin/interpreter/MiniZeppelin.java | 1 +
.../src/test/resources/log4j.properties | 2 ++
5 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index a5c6d5c..7e8ebc1 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -66,7 +66,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreter.class);
- private static final int MAX_TIMEOUT_SEC = 10;
+ private static final int MAX_TIMEOUT_SEC = 30;
private GatewayServer gatewayServer;
private DefaultExecutor executor;
@@ -291,10 +291,16 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
public class PythonInterpretRequest {
public String statements;
public boolean isForCompletion;
+ public boolean isCallHooks;
public PythonInterpretRequest(String statements, boolean isForCompletion) {
+ this(statements, isForCompletion, true);
+ }
+
+ public PythonInterpretRequest(String statements, boolean isForCompletion, boolean isCallHooks) {
this.statements = statements;
this.isForCompletion = isForCompletion;
+ this.isCallHooks = isCallHooks;
}
public String statements() {
@@ -304,6 +310,10 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
public boolean isForCompletion() {
return isForCompletion;
}
+
+ public boolean isCallHooks() {
+ return isCallHooks;
+ }
}
// called by Python Process
@@ -599,7 +609,9 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
String bootstrapCode =
IOUtils.toString(getClass().getClassLoader().getResourceAsStream(resourceName));
try {
- InterpreterResult result = interpret(bootstrapCode, InterpreterContext.get());
+ // Add hook explicitly, otherwise python will fail to execute the statement
+ InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()",
+ InterpreterContext.get());
if (result.code() != Code.SUCCESS) {
throw new IOException("Fail to run bootstrap script: " + resourceName);
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/python/src/main/resources/python/zeppelin_python.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py
index 19fa220..5ad16a4 100644
--- a/python/src/main/resources/python/zeppelin_python.py
+++ b/python/src/main/resources/python/zeppelin_python.py
@@ -111,12 +111,18 @@ while True :
# Get post-execute hooks
try:
- global_hook = intp.getHook('post_exec_dev')
+ if req.isCallHooks():
+ global_hook = intp.getHook('post_exec_dev')
+ else:
+ global_hook = None
except:
global_hook = None
try:
- user_hook = __zeppelin__.getHook('post_exec')
+ if req.isCallHooks():
+ user_hook = __zeppelin__.getHook('post_exec')
+ else:
+ user_hook = None
except:
user_hook = None
@@ -133,7 +139,6 @@ while True :
to_run_hooks = []
if (nhooks > 0):
to_run_hooks = code.body[-nhooks:]
-
to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
[code.body[-(nhooks + 1)]])
try:
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 1df6e2e..f3fee21 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -130,11 +130,11 @@ public class PySparkInterpreter extends PythonInterpreter {
try {
URLClassLoader newCl = new URLClassLoader(urls, oldCl);
Thread.currentThread().setContextClassLoader(newCl);
- // create Python Process and JVM gateway
- super.open();
// must create spark interpreter after ClassLoader is set, otherwise the additional jars
// can not be loaded by spark repl.
this.sparkInterpreter = getSparkInterpreter();
+ // create Python Process and JVM gateway
+ super.open();
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
}
@@ -175,7 +175,7 @@ public class PySparkInterpreter extends PythonInterpreter {
String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
callPython(new PythonInterpretRequest(
String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc),
- false));
+ false, false));
}
// Run python shell
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
index 923ae5a..14d0166 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
@@ -38,6 +38,7 @@ public class MiniZeppelin {
FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties"));
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "60000");
conf = new ZeppelinConfiguration();
interpreterSettingManager = new InterpreterSettingManager(conf,
mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5c7d65b1/zeppelin-zengine/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/log4j.properties b/zeppelin-zengine/src/test/resources/log4j.properties
index e45936e..c8e4342 100644
--- a/zeppelin-zengine/src/test/resources/log4j.properties
+++ b/zeppelin-zengine/src/test/resources/log4j.properties
@@ -44,3 +44,5 @@ log4j.logger.org.hibernate.type=ALL
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zeppelin.plugin=DEBUG
+log4j.logger.org.apache.zeppelin.spark=DEBUG
+log4j.logger.org.apache.zeppelin.python=DEBUG
\ No newline at end of file