You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/01/23 09:32:25 UTC

zeppelin git commit: [ZEPPELIN-1976] Text-Output too large, causing crash

Repository: zeppelin
Updated Branches:
  refs/heads/master 584b1d94e -> 14d13de06


[ZEPPELIN-1976] Text-Output too large, causing crash

### What is this PR for?
This PR implements interpreter output message limit.

`ZEPPELIN_INTERPRETER_OUTPUT_LIMIT` env variable or `zeppelin.interpreter.output.limit` jvm property can set limit of the interpreter output message in byte.

The limit applied to only TEXT and TABLE type output, not in HTML or other types.

### What type of PR is it?
Improvement

### Todos
* [x] - Task

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1976

### How should this be tested?

try to print more than the limit
```
%spark
(1 to 10000).foreach(i=>
    println(s"Print line ${i} times")
)
```

### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/1540981/22035334/6c17ff9a-dca4-11e6-89b0-51b9340856b0.png)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Lee moon soo <mo...@apache.org>

Closes #1908 from Leemoonsoo/ZEPPELIN-1976 and squashes the following commits:

639868b [Lee moon soo] update description
a9b4139 [Lee moon soo] Truncate output


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/14d13de0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/14d13de0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/14d13de0

Branch: refs/heads/master
Commit: 14d13de06d312ba92bfa42a239af80354ee03866
Parents: 584b1d9
Author: Lee moon soo <mo...@apache.org>
Authored: Tue Jan 17 11:10:45 2017 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Mon Jan 23 01:32:13 2017 -0800

----------------------------------------------------------------------
 conf/zeppelin-site.xml.template                 |  5 +++
 docs/install/configuration.md                   |  6 ++++
 .../apache/zeppelin/interpreter/Constants.java  |  2 ++
 .../zeppelin/interpreter/InterpreterOutput.java | 34 ++++++++++++++++++--
 .../interpreter/remote/RemoteInterpreter.java   | 11 +++++--
 .../remote/RemoteInterpreterServer.java         |  5 +++
 .../interpreter/InterpreterOutputTest.java      | 27 ++++++++++++++++
 .../apache/zeppelin/server/ZeppelinServer.java  |  3 ++
 .../zeppelin/conf/ZeppelinConfiguration.java    |  1 +
 .../interpreter/InterpreterFactory.java         |  5 +--
 10 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 7faacac..f4ce5cd 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -216,6 +216,11 @@
   <description>Interpreter process connect timeout in msec.</description>
 </property>
 
+<property>
+  <name>zeppelin.interpreter.output.limit</name>
+  <value>102400</value>
+  <description>Output message from interpreter exceeding the limit will be truncated</description>
+</property>
 
 <property>
   <name>zeppelin.ssl</name>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/docs/install/configuration.md
----------------------------------------------------------------------
diff --git a/docs/install/configuration.md b/docs/install/configuration.md
index 7a87838..56f6404 100644
--- a/docs/install/configuration.md
+++ b/docs/install/configuration.md
@@ -249,6 +249,12 @@ If both are defined, then the **environment variables** will take priority.
     <td>Interpreter directory</td>
   </tr>
   <tr>
+    <td>ZEPPELIN_INTERPRETER_OUTPUT_LIMIT</td>
+    <td>zeppelin.interpreter.output.limit</td>
+    <td>102400</td>
+    <td>Output message from interpreter exceeding the limit will be truncated</td>
+  </tr>
+  <tr>
     <td>ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE</td>
     <td>zeppelin.websocket.max.text.message.size</td>
     <td>1024000</td>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
index d5679a3..9115a98 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
@@ -30,4 +30,6 @@ public class Constants {
 
   public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914;
 
+  public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100;
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
index abdde8c..bf0d4b6 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -45,6 +45,13 @@ public class InterpreterOutput extends OutputStream {
   private final InterpreterOutputListener flushListener;
   private final InterpreterOutputChangeListener changeListener;
 
+  private int size = 0;
+
+  // change static var to set interpreter output limit
+  // limit will be applied to all InterpreterOutput object.
+  // so we can expect the consistent behavior
+  public static int limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+
   public InterpreterOutput(InterpreterOutputListener flushListener) {
     this.flushListener = flushListener;
     changeListener = null;
@@ -52,7 +59,8 @@ public class InterpreterOutput extends OutputStream {
   }
 
   public InterpreterOutput(InterpreterOutputListener flushListener,
-                           InterpreterOutputChangeListener listener) throws IOException {
+                           InterpreterOutputChangeListener listener)
+      throws IOException {
     this.flushListener = flushListener;
     this.changeListener = listener;
     clear();
@@ -74,6 +82,7 @@ public class InterpreterOutput extends OutputStream {
       out.setResourceSearchPaths(resourceSearchPaths);
 
       buffer.reset();
+      size = 0;
 
       if (currentOut != null) {
         currentOut.flush();
@@ -125,6 +134,8 @@ public class InterpreterOutput extends OutputStream {
   }
 
   public void clear() {
+    size = 0;
+    truncated = false;
     buffer.reset();
 
     synchronized (resultMessageOutputs) {
@@ -157,11 +168,31 @@ public class InterpreterOutput extends OutputStream {
   boolean startOfTheNewLine = true;
   boolean firstCharIsPercentSign = false;
 
+  boolean truncated = false;
+
   @Override
   public void write(int b) throws IOException {
     InterpreterResultMessageOutput out;
+    if (truncated) {
+      return;
+    }
 
     synchronized (resultMessageOutputs) {
+      currentOut = getCurrentOutput();
+
+      if (++size > limit) {
+        if (b == NEW_LINE_CHAR && currentOut != null) {
+          InterpreterResult.Type type = currentOut.getType();
+          if (type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE) {
+
+            setType(InterpreterResult.Type.TEXT);
+            getCurrentOutput().write("Output exceeds " + limit + ". Truncated.\n");
+            truncated = true;
+            return;
+          }
+        }
+      }
+
       if (startOfTheNewLine) {
         if (b == '%') {
           startOfTheNewLine = false;
@@ -175,7 +206,6 @@ public class InterpreterOutput extends OutputStream {
       }
 
       if (b == NEW_LINE_CHAR) {
-        currentOut = getCurrentOutput();
         if (currentOut != null && currentOut.getType() == InterpreterResult.Type.TABLE) {
           if (previousChar == NEW_LINE_CHAR) {
             startOfTheNewLine = true;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 9162c88..edd97f4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -63,6 +63,7 @@ public class RemoteInterpreter extends Interpreter {
   private int port;
   private String userName;
   private Boolean isUserImpersonate;
+  private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
 
   /**
    * Remote interpreter and manage interpreter process
@@ -70,7 +71,8 @@ public class RemoteInterpreter extends Interpreter {
   public RemoteInterpreter(Properties property, String sessionKey, String className,
       String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
       int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+      int outputLimit) {
     super(property);
     this.sessionKey = sessionKey;
     this.className = className;
@@ -85,6 +87,7 @@ public class RemoteInterpreter extends Interpreter {
     this.applicationEventListener = appListener;
     this.userName = userName;
     this.isUserImpersonate = isUserImpersonate;
+    this.outputLimit = outputLimit;
   }
 
 
@@ -94,7 +97,8 @@ public class RemoteInterpreter extends Interpreter {
   public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
       int port, String localRepoPath, int connectTimeout, int maxPoolSize,
       RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+      int outputLimit) {
     super(property);
     this.sessionKey = sessionKey;
     this.className = className;
@@ -108,6 +112,7 @@ public class RemoteInterpreter extends Interpreter {
     this.applicationEventListener = appListener;
     this.userName = userName;
     this.isUserImpersonate = isUserImpersonate;
+    this.outputLimit = outputLimit;
   }
 
 
@@ -217,6 +222,8 @@ public class RemoteInterpreter extends Interpreter {
         if (localRepoPath != null) {
           property.put("zeppelin.interpreter.localRepo", localRepoPath);
         }
+
+        property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
         client.createInterpreter(groupId, sessionKey,
             getClassName(), (Map) property, userName);
         // Push angular object loaded from JSON file to remote interpreter

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 4bd3603..879b4f5 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -162,6 +162,11 @@ public class RemoteInterpreterServer
       interpreterGroup.setResourcePool(resourcePool);
 
       String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
+      if (properties.containsKey("zeppelin.interpreter.output.limit")) {
+        InterpreterOutput.limit = Integer.parseInt(
+            properties.get("zeppelin.interpreter.output.limit"));
+      }
+
       depLoader = new DependencyResolver(localRepoPath);
       appLoader = new ApplicationLoader(resourcePool, depLoader);
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
index edfb4db..021edce 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
@@ -162,6 +162,33 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
     assertEquals("val1\tval2\n", new String(out.getOutputAt(1).toByteArray()));
   }
 
+  @Test
+  public void testTruncate() throws IOException {
+    // output is truncated after the new line
+    InterpreterOutput.limit = 3;
+    out = new InterpreterOutput(this);
+
+    // truncate text
+    out.write("%text hello\nworld\n");
+    assertEquals("hello", new String(out.getOutputAt(0).toByteArray()));
+    assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
+
+    // truncate table
+    out = new InterpreterOutput(this);
+    out.write("%table key\tvalue\nhello\t100\nworld\t200\n");
+    assertEquals("key\tvalue", new String(out.getOutputAt(0).toByteArray()));
+    assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("Truncated"));
+
+    // does not truncate html
+    out = new InterpreterOutput(this);
+    out.write("%html hello\nworld\n");
+    out.flush();
+    assertEquals("hello\nworld\n", new String(out.getOutputAt(0).toByteArray()));
+
+    // restore default
+    InterpreterOutput.limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+  }
+
 
   @Override
   public void onUpdateAll(InterpreterOutput out) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index b173d04..2cc2fec 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -37,6 +37,7 @@ import org.apache.zeppelin.helium.Helium;
 import org.apache.zeppelin.helium.HeliumApplicationFactory;
 import org.apache.zeppelin.helium.HeliumVisualizationFactory;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.NotebookAuthorization;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
@@ -98,6 +99,8 @@ public class ZeppelinServer extends Application {
     this.depResolver = new DependencyResolver(
         conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
 
+    InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
+
     HeliumApplicationFactory heliumApplicationFactory = new HeliumApplicationFactory();
     HeliumVisualizationFactory heliumVisualizationFactory;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 0c3ecac..388f432 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -572,6 +572,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
         + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
         + "scalding,jdbc,hbase,bigquery,beam,pig,scio"),
+    ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
     ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
     ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
     // use specified notebook (id) as homescreen

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/14d13de0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index e8b6868..e065742 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -1150,7 +1150,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
     LazyOpenInterpreter intp = new LazyOpenInterpreter(
         new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath,
             connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
-            userName, isUserImpersonate));
+            userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));
     return intp;
   }
 
@@ -1175,7 +1175,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
     RemoteInterpreter remoteInterpreter =
         new RemoteInterpreter(property, interpreterSessionKey, className,
             interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
-            remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate);
+            remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate,
+            conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT));
     remoteInterpreter.addEnv(env);
 
     return new LazyOpenInterpreter(remoteInterpreter);