You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/01/23 09:32:25 UTC
zeppelin git commit: [ZEPPELIN-1976] Text-Output too large,
causing crash
Repository: zeppelin
Updated Branches:
refs/heads/master 584b1d94e -> 14d13de06
[ZEPPELIN-1976] Text-Output too large, causing crash
### What is this PR for?
This PR implements interpreter output message limit.
`ZEPPELIN_INTERPRETER_OUTPUT_LIMIT` env variable or `zeppelin.interpreter.output.limit` jvm property can set limit of the interpreter output message in byte.
The limit applied to only TEXT and TABLE type output, not in HTML or other types.
### What type of PR is it?
Improvement
### Todos
* [x] - Task
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1976
### How should this be tested?
try to print more than the limit
```
%spark
(1 to 10000).foreach(i=>
println(s"Print line ${i} times")
)
```
### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/1540981/22035334/6c17ff9a-dca4-11e6-89b0-51b9340856b0.png)
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: Lee moon soo <mo...@apache.org>
Closes #1908 from Leemoonsoo/ZEPPELIN-1976 and squashes the following commits:
639868b [Lee moon soo] update description
a9b4139 [Lee moon soo] Truncate output
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/14d13de0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/14d13de0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/14d13de0
Branch: refs/heads/master
Commit: 14d13de06d312ba92bfa42a239af80354ee03866
Parents: 584b1d9
Author: Lee moon soo <mo...@apache.org>
Authored: Tue Jan 17 11:10:45 2017 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Mon Jan 23 01:32:13 2017 -0800
----------------------------------------------------------------------
conf/zeppelin-site.xml.template | 5 +++
docs/install/configuration.md | 6 ++++
.../apache/zeppelin/interpreter/Constants.java | 2 ++
.../zeppelin/interpreter/InterpreterOutput.java | 34 ++++++++++++++++++--
.../interpreter/remote/RemoteInterpreter.java | 11 +++++--
.../remote/RemoteInterpreterServer.java | 5 +++
.../interpreter/InterpreterOutputTest.java | 27 ++++++++++++++++
.../apache/zeppelin/server/ZeppelinServer.java | 3 ++
.../zeppelin/conf/ZeppelinConfiguration.java | 1 +
.../interpreter/InterpreterFactory.java | 5 +--
10 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 7faacac..f4ce5cd 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -216,6 +216,11 @@
<description>Interpreter process connect timeout in msec.</description>
</property>
+<property>
+ <name>zeppelin.interpreter.output.limit</name>
+ <value>102400</value>
+ <description>Output message from interpreter exceeding the limit will be truncated</description>
+</property>
<property>
<name>zeppelin.ssl</name>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/docs/install/configuration.md
----------------------------------------------------------------------
diff --git a/docs/install/configuration.md b/docs/install/configuration.md
index 7a87838..56f6404 100644
--- a/docs/install/configuration.md
+++ b/docs/install/configuration.md
@@ -249,6 +249,12 @@ If both are defined, then the **environment variables** will take priority.
<td>Interpreter directory</td>
</tr>
<tr>
+ <td>ZEPPELIN_INTERPRETER_OUTPUT_LIMIT</td>
+ <td>zeppelin.interpreter.output.limit</td>
+ <td>102400</td>
+ <td>Output message from interpreter exceeding the limit will be truncated</td>
+ </tr>
+ <tr>
<td>ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE</td>
<td>zeppelin.websocket.max.text.message.size</td>
<td>1024000</td>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
index d5679a3..9115a98 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
@@ -30,4 +30,6 @@ public class Constants {
public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914;
+ public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100;
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
index abdde8c..bf0d4b6 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -45,6 +45,13 @@ public class InterpreterOutput extends OutputStream {
private final InterpreterOutputListener flushListener;
private final InterpreterOutputChangeListener changeListener;
+ private int size = 0;
+
+ // change static var to set interpreter output limit
+ // limit will be applied to all InterpreterOutput object.
+ // so we can expect the consistent behavior
+ public static int limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+
public InterpreterOutput(InterpreterOutputListener flushListener) {
this.flushListener = flushListener;
changeListener = null;
@@ -52,7 +59,8 @@ public class InterpreterOutput extends OutputStream {
}
public InterpreterOutput(InterpreterOutputListener flushListener,
- InterpreterOutputChangeListener listener) throws IOException {
+ InterpreterOutputChangeListener listener)
+ throws IOException {
this.flushListener = flushListener;
this.changeListener = listener;
clear();
@@ -74,6 +82,7 @@ public class InterpreterOutput extends OutputStream {
out.setResourceSearchPaths(resourceSearchPaths);
buffer.reset();
+ size = 0;
if (currentOut != null) {
currentOut.flush();
@@ -125,6 +134,8 @@ public class InterpreterOutput extends OutputStream {
}
public void clear() {
+ size = 0;
+ truncated = false;
buffer.reset();
synchronized (resultMessageOutputs) {
@@ -157,11 +168,31 @@ public class InterpreterOutput extends OutputStream {
boolean startOfTheNewLine = true;
boolean firstCharIsPercentSign = false;
+ boolean truncated = false;
+
@Override
public void write(int b) throws IOException {
InterpreterResultMessageOutput out;
+ if (truncated) {
+ return;
+ }
synchronized (resultMessageOutputs) {
+ currentOut = getCurrentOutput();
+
+ if (++size > limit) {
+ if (b == NEW_LINE_CHAR && currentOut != null) {
+ InterpreterResult.Type type = currentOut.getType();
+ if (type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE) {
+
+ setType(InterpreterResult.Type.TEXT);
+ getCurrentOutput().write("Output exceeds " + limit + ". Truncated.\n");
+ truncated = true;
+ return;
+ }
+ }
+ }
+
if (startOfTheNewLine) {
if (b == '%') {
startOfTheNewLine = false;
@@ -175,7 +206,6 @@ public class InterpreterOutput extends OutputStream {
}
if (b == NEW_LINE_CHAR) {
- currentOut = getCurrentOutput();
if (currentOut != null && currentOut.getType() == InterpreterResult.Type.TABLE) {
if (previousChar == NEW_LINE_CHAR) {
startOfTheNewLine = true;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 9162c88..edd97f4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -63,6 +63,7 @@ public class RemoteInterpreter extends Interpreter {
private int port;
private String userName;
private Boolean isUserImpersonate;
+ private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
/**
* Remote interpreter and manage interpreter process
@@ -70,7 +71,8 @@ public class RemoteInterpreter extends Interpreter {
public RemoteInterpreter(Properties property, String sessionKey, String className,
String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
+ ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+ int outputLimit) {
super(property);
this.sessionKey = sessionKey;
this.className = className;
@@ -85,6 +87,7 @@ public class RemoteInterpreter extends Interpreter {
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
+ this.outputLimit = outputLimit;
}
@@ -94,7 +97,8 @@ public class RemoteInterpreter extends Interpreter {
public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
int port, String localRepoPath, int connectTimeout, int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
+ ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+ int outputLimit) {
super(property);
this.sessionKey = sessionKey;
this.className = className;
@@ -108,6 +112,7 @@ public class RemoteInterpreter extends Interpreter {
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
+ this.outputLimit = outputLimit;
}
@@ -217,6 +222,8 @@ public class RemoteInterpreter extends Interpreter {
if (localRepoPath != null) {
property.put("zeppelin.interpreter.localRepo", localRepoPath);
}
+
+ property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
client.createInterpreter(groupId, sessionKey,
getClassName(), (Map) property, userName);
// Push angular object loaded from JSON file to remote interpreter
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 4bd3603..879b4f5 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -162,6 +162,11 @@ public class RemoteInterpreterServer
interpreterGroup.setResourcePool(resourcePool);
String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
+ if (properties.containsKey("zeppelin.interpreter.output.limit")) {
+ InterpreterOutput.limit = Integer.parseInt(
+ properties.get("zeppelin.interpreter.output.limit"));
+ }
+
depLoader = new DependencyResolver(localRepoPath);
appLoader = new ApplicationLoader(resourcePool, depLoader);
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
index edfb4db..021edce 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
@@ -162,6 +162,33 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
assertEquals("val1\tval2\n", new String(out.getOutputAt(1).toByteArray()));
}
+ @Test
+ public void testTruncate() throws IOException {
+ // output is truncated after the new line
+ InterpreterOutput.limit = 3;
+ out = new InterpreterOutput(this);
+
+ // truncate text
+ out.write("%text hello\nworld\n");
+ assertEquals("hello", new String(out.getOutputAt(0).toByteArray()));
+ assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
+
+ // truncate table
+ out = new InterpreterOutput(this);
+ out.write("%table key\tvalue\nhello\t100\nworld\t200\n");
+ assertEquals("key\tvalue", new String(out.getOutputAt(0).toByteArray()));
+ assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
+
+ // does not truncate html
+ out = new InterpreterOutput(this);
+ out.write("%html hello\nworld\n");
+ out.flush();
+ assertEquals("hello\nworld\n", new String(out.getOutputAt(0).toByteArray()));
+
+ // restore default
+ InterpreterOutput.limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+ }
+
@Override
public void onUpdateAll(InterpreterOutput out) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index b173d04..2cc2fec 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -37,6 +37,7 @@ import org.apache.zeppelin.helium.Helium;
import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.helium.HeliumVisualizationFactory;
import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
@@ -98,6 +99,8 @@ public class ZeppelinServer extends Application {
this.depResolver = new DependencyResolver(
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
+ InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
+
HeliumApplicationFactory heliumApplicationFactory = new HeliumApplicationFactory();
HeliumVisualizationFactory heliumVisualizationFactory;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 0c3ecac..388f432 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -572,6 +572,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
+ "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
+ "scalding,jdbc,hbase,bigquery,beam,pig,scio"),
+ ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
// use specified notebook (id) as homescreen
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index e8b6868..e065742 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -1150,7 +1150,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
LazyOpenInterpreter intp = new LazyOpenInterpreter(
new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath,
connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
- userName, isUserImpersonate));
+ userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));
return intp;
}
@@ -1175,7 +1175,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
RemoteInterpreter remoteInterpreter =
new RemoteInterpreter(property, interpreterSessionKey, className,
interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
- remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
+ remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate,
+ conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT));
remoteInterpreter.addEnv(env);
return new LazyOpenInterpreter(remoteInterpreter);