You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/01/21 00:36:55 UTC

[2/2] incubator-zeppelin git commit: [ZEPPELIN-554] Streaming interpreter output to front-end

[ZEPPELIN-554] Streaming interpreter output to front-end

### What is this PR for?
Output from interpreter is displayed after completion of paragraph execution.
It'll be useful if output can be streamed to front-end during execution.

Previous work #593 injects InterpreterOutput stream object to Interpreter.
This PR is based on #593 and stream the data from InterpreterOutput to front-end.

This implementation only streams output is %text. Other output type (%html, %angular, %table) is not streamed to the front end.

While this PR keeps backward compatibility, Interpreter who want to use this feature will need to modify code to write output into `InterpreterOutput` instead of return with `InterpreterResult`.

This PR includes modification of SparkInterpreter to use InterpreterOutput.

### What type of PR is it?
Feature

### Todos

### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-554

### How should this be tested?
Run such code using Spark interpreter
```
(1 to 10).foreach{ i=>
    Thread.sleep(1000)
    println("Hello " + i)
}
```

### Screenshots (if appropriate)
![stream_output](https://cloud.githubusercontent.com/assets/1540981/12188677/6df08900-b56c-11e5-95d9-e5f6fad91007.gif)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Lee moon soo <mo...@apache.org>

Closes #611 from Leemoonsoo/output_stream_frontend and squashes the following commits:

53e2bb4 [Lee moon soo] Not persist on every append
dedae0d [Lee moon soo] Remove debug lines
8251fb4 [Lee moon soo] Fix syntax and style
9c9c8fd [Lee moon soo] update test
18215a3 [Lee moon soo] fix style
f7e6a4d [Lee moon soo] Fix syntax error
d29cfbf [Lee moon soo] workaround jshint
07b3e1a [Lee moon soo] Handle clear output correctly
bc6262e [Lee moon soo] Make PysparkInterpreter stream output
6d9cc51 [Lee moon soo] Pass InterpreterOutput to SparkILoop
b68180e [Lee moon soo] Add InterpreterOutput on spark interpreter unitest
626ad48 [Lee moon soo] Add license header
846015b [Lee moon soo] Update scalding
37d6920 [Lee moon soo] Handle display system directive correctly
e278e84 [Lee moon soo] Clear output correctly
479b836 [Lee moon soo] Add test
c01df62 [Lee moon soo] Connect Spark interpreter Console.out to outputstream
8a1223f [Lee moon soo] Handle update output correctly
e7a9b37 [Lee moon soo] Delayed persist
2060c1e [Lee moon soo] Clear before render text
786c978 [Lee moon soo] update paragraph object after witing to outputstream
258ff38 [Lee moon soo] Barely working
6f607f7 [Lee moon soo] Add newline listener
89d9798 [Lee moon soo] Render text output line by line
a42e4ff [Lee moon soo] Update test
fb5e7b5 [Lee moon soo] Update test
0f60b54 [Lee moon soo] Update test
a07d7db [Lee moon soo] Implement InterpreterResult.toString
1f419b6 [Lee moon soo] Add InterpreterOutput
c91f498 [Lee moon soo] prepend interpreteroutputstream to interpreter result


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/5ec59a81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/5ec59a81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/5ec59a81

Branch: refs/heads/master
Commit: 5ec59a81b2fda2fb65d4075e0672930b769f41d2
Parents: dbdaf84
Author: Lee moon soo <mo...@apache.org>
Authored: Sat Jan 16 11:04:09 2016 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Jan 20 15:39:21 2016 -0800

----------------------------------------------------------------------
 .../zeppelin/flink/FlinkInterpreterTest.java    |   2 +-
 .../zeppelin/hive/HiveInterpreterTest.java      |  12 +-
 .../zeppelin/ignite/IgniteInterpreterTest.java  |   2 +-
 .../ignite/IgniteSqlInterpreterTest.java        |   2 +-
 .../scalding/ScaldingInterpreterTest.java       |   2 +-
 .../zeppelin/spark/PySparkInterpreter.java      |  36 ++-
 .../apache/zeppelin/spark/SparkInterpreter.java |  32 +--
 .../zeppelin/spark/SparkOutputStream.java       |  75 ++++++
 .../apache/zeppelin/spark/ZeppelinContext.java  |  18 +-
 .../main/resources/python/zeppelin_pyspark.py   |   7 +-
 .../zeppelin/spark/DepInterpreterTest.java      |   2 +-
 .../zeppelin/spark/SparkInterpreterTest.java    |  23 +-
 .../zeppelin/spark/SparkSqlInterpreterTest.java |  17 +-
 .../interpreter/InterpreterContext.java         |   5 +-
 .../zeppelin/interpreter/InterpreterOutput.java | 249 +++++++++++++++++++
 .../InterpreterOutputChangeListener.java        |  27 ++
 .../InterpreterOutputChangeWatcher.java         | 140 +++++++++++
 .../interpreter/InterpreterOutputListener.java  |  34 +++
 .../zeppelin/interpreter/InterpreterResult.java |   4 +
 .../interpreter/remote/RemoteInterpreter.java   |  27 +-
 .../remote/RemoteInterpreterEventPoller.java    |  26 +-
 .../remote/RemoteInterpreterProcess.java        |   9 +-
 .../RemoteInterpreterProcessListener.java       |  25 ++
 .../remote/RemoteInterpreterServer.java         |  68 ++++-
 .../thrift/RemoteInterpreterContext.java        |   2 +-
 .../thrift/RemoteInterpreterEvent.java          |   2 +-
 .../thrift/RemoteInterpreterEventType.java      |   8 +-
 .../thrift/RemoteInterpreterResult.java         |   2 +-
 .../thrift/RemoteInterpreterService.java        |   2 +-
 .../main/thrift/RemoteInterpreterService.thrift |   4 +-
 .../interpreter/InterpreterContextTest.java     |   2 +-
 .../InterpreterOutputChangeWatcherTest.java     | 109 ++++++++
 .../interpreter/InterpreterOutputTest.java      | 127 ++++++++++
 .../interpreter/InterpreterResultTest.java      |   5 +
 .../remote/RemoteAngularObjectTest.java         |  15 +-
 .../RemoteInterpreterOutputTestStream.java      | 146 +++++++++++
 .../remote/RemoteInterpreterProcessTest.java    |   2 +-
 .../remote/RemoteInterpreterTest.java           | 174 +++++--------
 .../mock/MockInterpreterOutputStream.java       |  97 ++++++++
 .../zeppelin/scheduler/RemoteSchedulerTest.java |  32 +--
 .../apache/zeppelin/server/ZeppelinServer.java  |   3 +-
 .../org/apache/zeppelin/socket/Message.java     |   2 +
 .../apache/zeppelin/socket/NotebookServer.java  |  82 +++++-
 .../notebook/paragraph/paragraph-results.html   |  12 +-
 .../notebook/paragraph/paragraph.controller.js  |  65 ++++-
 .../websocketEvents/websocketEvents.factory.js  |   4 +
 .../interpreter/InterpreterFactory.java         |  13 +-
 .../zeppelin/notebook/JobListenerFactory.java   |   4 +-
 .../java/org/apache/zeppelin/notebook/Note.java |  77 ++++--
 .../org/apache/zeppelin/notebook/Paragraph.java |  55 +++-
 .../zeppelin/notebook/ParagraphJobListener.java |  29 +++
 .../interpreter/InterpreterFactoryTest.java     |   6 +-
 .../notebook/NoteInterpreterLoaderTest.java     |   2 +-
 .../apache/zeppelin/notebook/NotebookTest.java  |  18 +-
 .../notebook/repo/NotebookRepoSyncTest.java     |  20 +-
 .../notebook/repo/VFSNotebookRepoTest.java      |   9 +-
 56 files changed, 1672 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 3168f04..9a61be6 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -40,7 +40,7 @@ public class FlinkInterpreterTest {
     Properties p = new Properties();
     flink = new FlinkInterpreter(p);
     flink.open();
-    context = new InterpreterContext(null, null, null, null, null, null, null, null);
+    context = new InterpreterContext(null, null, null, null, null, null, null, null, null);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
index c22080d..c86fcf3 100644
--- a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
+++ b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
@@ -79,9 +79,9 @@ public class HiveInterpreterTest {
     HiveInterpreter t = new HiveInterpreter(properties);
     t.open();
 
-    assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message().contains("SCHEMA_NAME"));
+    assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME"));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
   }
 
   @Test
@@ -101,7 +101,7 @@ public class HiveInterpreterTest {
     t.open();
 
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
-        t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
+        t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
   }
 
   @Test
@@ -117,13 +117,13 @@ public class HiveInterpreterTest {
     t.open();
 
     InterpreterResult interpreterResult =
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
 
     t.getConnection("default").close();
 
     interpreterResult =
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
   }
 
@@ -139,7 +139,7 @@ public class HiveInterpreterTest {
     HiveInterpreter t = new HiveInterpreter(properties);
     t.open();
 
-    InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null);
+    InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null);
 
     //simple select test
     InterpreterResult result = t.interpret("select * from test_table", interpreterContext);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
index f46b049..cf98083 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
@@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
   private static final String HOST = "127.0.0.1:47500..47509";
 
   private static final InterpreterContext INTP_CONTEXT =
-          new InterpreterContext(null, null, null, null, null, null, null, null);
+          new InterpreterContext(null, null, null, null, null, null, null, null, null);
 
   private IgniteInterpreter intp;
   private Ignite ignite;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
index fb93ad5..a6dcc66 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
@@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
   private static final String HOST = "127.0.0.1:47500..47509";
 
   private static final InterpreterContext INTP_CONTEXT =
-          new InterpreterContext(null, null, null, null, null, null, null, null);
+          new InterpreterContext(null, null, null, null, null, null, null, null, null);
 
   private Ignite ignite;
   private IgniteSqlInterpreter intp;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index 7a753fa..606d4d9 100644
--- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -65,7 +65,7 @@ public class ScaldingInterpreterTest {
     context = new InterpreterContext("note", "id", "title", "text",
         new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
             intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+        new LinkedList<InterpreterContextRunner>(), null);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 8c4ba87..c5441ab 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -73,8 +73,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
   private GatewayServer gatewayServer;
   private DefaultExecutor executor;
   private int port;
-  private ByteArrayOutputStream outputStream;
-  private ByteArrayOutputStream errStream;
+  private SparkOutputStream outputStream;
   private BufferedWriter ins;
   private PipedInputStream in;
   private ByteArrayOutputStream input;
@@ -173,7 +172,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     cmd.addArgument(Integer.toString(port), false);
     cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
     executor = new DefaultExecutor();
-    outputStream = new ByteArrayOutputStream();
+    outputStream = new SparkOutputStream();
     PipedOutputStream ps = new PipedOutputStream();
     in = null;
     try {
@@ -274,7 +273,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       statementError = error;
       statementFinishedNotifier.notify();
     }
-
   }
 
   boolean pythonScriptInitialized = false;
@@ -287,6 +285,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     }
   }
 
+  public void appendOutput(String message) throws IOException {
+    outputStream.getInterpreterOutput().write(message);
+  }
+
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) {
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
@@ -300,7 +302,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
           + outputStream.toString());
     }
 
-    outputStream.reset();
+    outputStream.setInterpreterOutput(context.out);
 
     synchronized (pythonScriptInitializeNotifier) {
       long startTime = System.currentTimeMillis();
@@ -314,15 +316,24 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       }
     }
 
+    String errorMessage = "";
+    try {
+      context.out.flush();
+      errorMessage = new String(context.out.toByteArray());
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
+
+
     if (pythonscriptRunning == false) {
       // python script failed to initialize and terminated
       return new InterpreterResult(Code.ERROR, "failed to start pyspark"
-          + outputStream.toString());
+          + errorMessage);
     }
     if (pythonScriptInitialized == false) {
       // timeout. didn't get initialized message
       return new InterpreterResult(Code.ERROR, "pyspark is not responding "
-          + outputStream.toString());
+          + errorMessage);
     }
 
     if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
@@ -352,7 +363,14 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     if (statementError) {
       return new InterpreterResult(Code.ERROR, statementOutput);
     } else {
-      return new InterpreterResult(Code.SUCCESS, statementOutput);
+
+      try {
+        context.out.flush();
+      } catch (IOException e) {
+        throw new InterpreterException(e);
+      }
+
+      return new InterpreterResult(Code.SUCCESS);
     }
   }
 
@@ -389,8 +407,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       return new LinkedList<String>();
     }
 
-    outputStream.reset();
-
     pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "");
     statementOutput = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index d975791..7ee6d7c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -17,9 +17,7 @@
 
 package org.apache.zeppelin.spark;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -41,7 +39,6 @@ import org.apache.spark.repl.SparkJLineCompletion;
 import org.apache.spark.scheduler.ActiveJob;
 import org.apache.spark.scheduler.DAGScheduler;
 import org.apache.spark.scheduler.Pool;
-import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.ui.jobs.JobProgressListener;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -115,7 +112,7 @@ public class SparkInterpreter extends Interpreter {
   private SparkILoop interpreter;
   private SparkIMain intp;
   private SparkContext sc;
-  private ByteArrayOutputStream out;
+  private SparkOutputStream out;
   private SQLContext sqlc;
   private SparkDependencyResolver dep;
   private SparkJLineCompletion completor;
@@ -129,7 +126,7 @@ public class SparkInterpreter extends Interpreter {
 
   public SparkInterpreter(Properties property) {
     super(property);
-    out = new ByteArrayOutputStream();
+    out = new SparkOutputStream();
   }
 
   public SparkInterpreter(Properties property, SparkContext sc) {
@@ -452,10 +449,9 @@ public class SparkInterpreter extends Interpreter {
     b.v_$eq(true);
     settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
 
-    PrintStream printStream = new PrintStream(out);
-
     /* spark interpreter */
     this.interpreter = new SparkILoop(null, new PrintWriter(out));
+
     interpreter.settings_$eq(settings);
 
     interpreter.createInterpreter();
@@ -481,7 +477,7 @@ public class SparkInterpreter extends Interpreter {
 
     dep = getDependencyResolver();
 
-    z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
+    z = new ZeppelinContext(sc, sqlc, null, dep,
         Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
 
     intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
@@ -489,7 +485,6 @@ public class SparkInterpreter extends Interpreter {
     binder.put("sc", sc);
     binder.put("sqlc", sqlc);
     binder.put("z", z);
-    binder.put("out", printStream);
 
     intp.interpret("@transient val z = "
                  + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
@@ -675,13 +670,13 @@ public class SparkInterpreter extends Interpreter {
     synchronized (this) {
       z.setGui(context.getGui());
       sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
-      InterpreterResult r = interpretInput(lines);
+      InterpreterResult r = interpretInput(lines, context);
       sc.clearJobGroup();
       return r;
     }
   }
 
-  public InterpreterResult interpretInput(String[] lines) {
+  public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
     SparkEnv.set(env);
 
     // add print("") to make sure not finishing with comment
@@ -692,8 +687,9 @@ public class SparkInterpreter extends Interpreter {
     }
     linesToRun[lines.length] = "print(\"\")";
 
-    Console.setOut((java.io.PrintStream) binder.get("out"));
-    out.reset();
+    Console.setOut(context.out);
+    out.setInterpreterOutput(context.out);
+    context.out.clear();
     Code r = null;
     String incomplete = "";
 
@@ -713,6 +709,7 @@ public class SparkInterpreter extends Interpreter {
         res = intp.interpret(incomplete + s);
       } catch (Exception e) {
         sc.clearJobGroup();
+        out.setInterpreterOutput(null);
         logger.info("Interpreter exception", e);
         return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
       }
@@ -721,7 +718,8 @@ public class SparkInterpreter extends Interpreter {
 
       if (r == Code.ERROR) {
         sc.clearJobGroup();
-        return new InterpreterResult(r, out.toString());
+        out.setInterpreterOutput(null);
+        return new InterpreterResult(r, "");
       } else if (r == Code.INCOMPLETE) {
         incomplete += s + "\n";
       } else {
@@ -730,9 +728,13 @@ public class SparkInterpreter extends Interpreter {
     }
 
     if (r == Code.INCOMPLETE) {
+      sc.clearJobGroup();
+      out.setInterpreterOutput(null);
       return new InterpreterResult(r, "Incomplete expression");
     } else {
-      return new InterpreterResult(r, out.toString());
+      sc.clearJobGroup();
+      out.setInterpreterOutput(null);
+      return new InterpreterResult(Code.SUCCESS);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
new file mode 100644
index 0000000..98a4090
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * InterpreterOutput can be attached / detached.
+ */
+public class SparkOutputStream extends OutputStream {
+  InterpreterOutput interpreterOutput;
+
+  public SparkOutputStream() {
+  }
+
+  public InterpreterOutput getInterpreterOutput() {
+    return interpreterOutput;
+  }
+
+  public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
+    this.interpreterOutput = interpreterOutput;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.write(b);
+    }
+  }
+
+  @Override
+  public void write(byte [] b) throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.write(b);
+    }
+  }
+
+  @Override
+  public void write(byte [] b, int offset, int len) throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.write(b, offset, len);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.close();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.flush();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index a55ed73..6869161 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -21,6 +21,7 @@ import static scala.collection.JavaConversions.asJavaCollection;
 import static scala.collection.JavaConversions.asJavaIterable;
 import static scala.collection.JavaConversions.collectionAsScalaIterable;
 
+import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -54,19 +55,17 @@ import scala.collection.Iterable;
  */
 public class ZeppelinContext extends HashMap<String, Object> {
   private SparkDependencyResolver dep;
-  private PrintStream out;
   private InterpreterContext interpreterContext;
   private int maxResult;
 
   public ZeppelinContext(SparkContext sc, SQLContext sql,
       InterpreterContext interpreterContext,
-      SparkDependencyResolver dep, PrintStream printStream,
+      SparkDependencyResolver dep,
       int maxResult) {
     this.sc = sc;
     this.sqlContext = sql;
     this.interpreterContext = interpreterContext;
     this.dep = dep;
-    this.out = printStream;
     this.maxResult = maxResult;
   }
 
@@ -273,10 +272,15 @@ public class ZeppelinContext extends HashMap<String, Object> {
       throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
     }
 
-    if (cls.isInstance(o)) {
-      out.print(showDF(sc, interpreterContext, o, maxResult));
-    } else {
-      out.print(o.toString());
+
+    try {
+      if (cls.isInstance(o)) {
+        interpreterContext.out.write(showDF(sc, interpreterContext, o, maxResult));
+      } else {
+        interpreterContext.out.write(o.toString());
+      }
+    } catch (IOException e) {
+      throw new InterpreterException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 62f0a82..7da0f4e 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -36,10 +36,7 @@ class Logger(object):
     self.out = ""
 
   def write(self, message):
-    self.out = self.out + message
-
-  def get(self):
-    return self.out
+    intp.appendOutput(message)
 
   def reset(self):
     self.out = ""
@@ -224,7 +221,7 @@ while True :
       sc.setJobGroup(jobGroup, "Zeppelin")
       eval(compiledCode)
 
-    intp.setStatementsFinished(output.get(), False)
+    intp.setStatementsFinished("", False)
   except Py4JJavaError:
     excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
     innerErrorStart = excInnerError.find("Py4JJavaError:")

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index efa8fae..2b5613a 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -60,7 +60,7 @@ public class DepInterpreterTest {
 
     context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+        new LinkedList<InterpreterContextRunner>(), null);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index b629978..778966f 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -28,10 +28,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.junit.After;
 import org.junit.Before;
@@ -79,9 +76,21 @@ public class SparkInterpreterTest {
 
     InterpreterGroup intpGroup = new InterpreterGroup();
     context = new InterpreterContext("note", "id", "title", "text",
-        new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
-            intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LinkedList<InterpreterContextRunner>(),
+            new InterpreterOutput(new InterpreterOutputListener() {
+              @Override
+              public void onAppend(InterpreterOutput out, byte[] line) {
+
+              }
+
+              @Override
+              public void onUpdate(InterpreterOutput out, byte[] output) {
+
+              }
+            }));
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 4688cf8..731eab6 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -25,10 +25,7 @@ import java.util.Properties;
 
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import org.junit.After;
 import org.junit.Before;
@@ -69,7 +66,17 @@ public class SparkSqlInterpreterTest {
     }
     context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
+      @Override
+      public void onAppend(InterpreterOutput out, byte[] line) {
+
+      }
+
+      @Override
+      public void onUpdate(InterpreterOutput out, byte[] output) {
+
+      }
+    }));
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 0417f91..e3f6b59 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -29,6 +29,7 @@ import org.apache.zeppelin.display.GUI;
 public class InterpreterContext {
   private static final ThreadLocal<InterpreterContext> threadIC =
       new ThreadLocal<InterpreterContext>();
+  public final InterpreterOutput out;
 
   public static InterpreterContext get() {
     return threadIC.get();
@@ -58,7 +59,8 @@ public class InterpreterContext {
                             Map<String, Object> config,
                             GUI gui,
                             AngularObjectRegistry angularObjectRegistry,
-                            List<InterpreterContextRunner> runners
+                            List<InterpreterContextRunner> runners,
+                            InterpreterOutput out
                             ) {
     this.noteId = noteId;
     this.paragraphId = paragraphId;
@@ -68,6 +70,7 @@ public class InterpreterContext {
     this.gui = gui;
     this.angularObjectRegistry = angularObjectRegistry;
     this.runners = runners;
+    this.out = out;
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
new file mode 100644
index 0000000..42ebe48
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * InterpreterOutput is OutputStream that supposed to print content on notebook
+ * in addition to InterpreterResult which used to return from Interpreter.interpret().
+ */
+public class InterpreterOutput extends OutputStream {
+  Logger logger = LoggerFactory.getLogger(InterpreterOutput.class);
+  private final int NEW_LINE_CHAR = '\n';
+
+  ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+  private final List<Object> outList = new LinkedList<Object>();
+  private InterpreterOutputChangeWatcher watcher;
+  private final InterpreterOutputListener flushListener;
+  private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
+  private boolean firstWrite = true;
+
+  public InterpreterOutput(InterpreterOutputListener flushListener) {
+    this.flushListener = flushListener;
+    clear();
+  }
+
+  public InterpreterOutput(InterpreterOutputListener flushListener,
+                           InterpreterOutputChangeListener listener) throws IOException {
+    this.flushListener = flushListener;
+    clear();
+    watcher = new InterpreterOutputChangeWatcher(listener);
+    watcher.start();
+  }
+
+  public InterpreterResult.Type getType() {
+    return type;
+  }
+
+  public void setType(InterpreterResult.Type type) {
+    if (this.type != type) {
+      clear();
+      flushListener.onUpdate(this, new byte[]{});
+      this.type = type;
+    }
+  }
+
+  public void clear() {
+    synchronized (outList) {
+      type = InterpreterResult.Type.TEXT;
+      buffer.reset();
+      outList.clear();
+      if (watcher != null) {
+        watcher.clear();
+      }
+    }
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    synchronized (outList) {
+      buffer.write(b);
+      if (b == NEW_LINE_CHAR) {
+        // first time use of this outputstream.
+        if (firstWrite) {
+          // clear the output on gui
+          flushListener.onUpdate(this, new byte[]{});
+          firstWrite = false;
+        }
+
+        flush();
+      }
+    }
+  }
+
+  private byte [] detectTypeFromLine(byte [] byteArray) {
+    // check output type directive
+    String line = new String(byteArray);
+    for (InterpreterResult.Type t : InterpreterResult.Type.values()) {
+      String typeString = '%' + t.name().toLowerCase();
+      if ((typeString + "\n").equals(line)) {
+        setType(t);
+        byteArray = null;
+        break;
+      } else if (line.startsWith(typeString + " ")) {
+        setType(t);
+        byteArray = line.substring(typeString.length() + 1).getBytes();
+        break;
+      }
+    }
+
+    return byteArray;
+  }
+
+  @Override
+  public void write(byte [] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte [] b, int off, int len) throws IOException {
+    synchronized (outList) {
+      for (int i = off; i < len; i++) {
+        write(b[i]);
+      }
+    }
+  }
+
+  /**
+   * In dev mode, it monitors file and update ZeppelinServer
+   * @param file
+   * @throws IOException
+   */
+  public void write(File file) throws IOException {
+    outList.add(file);
+    if (watcher != null) {
+      watcher.watch(file);
+    }
+  }
+
+  public void write(String string) throws IOException {
+    write(string.getBytes());
+  }
+
+  /**
+   * write contents in the resource file in the classpath
+   * @param url
+   * @throws IOException
+   */
+  public void write(URL url) throws IOException {
+    if ("file".equals(url.getProtocol())) {
+      write(new File(url.getPath()));
+    } else {
+      outList.add(url);
+    }
+  }
+
+  public void writeResource(String resourceName) throws IOException {
+    // search file under resource dir first for dev mode
+    File mainResource = new File("./src/main/resources/" + resourceName);
+    File testResource = new File("./src/test/resources/" + resourceName);
+    if (mainResource.isFile()) {
+      write(mainResource);
+    } else if (testResource.isFile()) {
+      write(testResource);
+    } else {
+      // search from classpath
+      ClassLoader cl = Thread.currentThread().getContextClassLoader();
+      if (cl == null) {
+        cl = this.getClass().getClassLoader();
+      }
+      if (cl == null) {
+        cl = ClassLoader.getSystemClassLoader();
+      }
+
+      write(cl.getResource(resourceName));
+    }
+  }
+
+  public byte[] toByteArray() throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    List<Object> all = new LinkedList<Object>();
+
+    synchronized (outList) {
+      all.addAll(outList);
+    }
+
+    for (Object o : all) {
+      if (o instanceof File) {
+        File f = (File) o;
+        FileInputStream fin = new FileInputStream(f);
+        copyStream(fin, out);
+        fin.close();
+      } else if (o instanceof byte[]) {
+        out.write((byte[]) o);
+      } else if (o instanceof Integer) {
+        out.write((int) o);
+      } else if (o instanceof URL) {
+        InputStream fin = ((URL) o).openStream();
+        copyStream(fin, out);
+        fin.close();
+      } else {
+        // can not handle the object
+      }
+    }
+    out.close();
+    return out.toByteArray();
+  }
+
+  public void flush() throws IOException {
+    synchronized (outList) {
+      buffer.flush();
+      byte[] bytes = buffer.toByteArray();
+      bytes = detectTypeFromLine(bytes);
+      if (bytes != null) {
+        outList.add(bytes);
+        if (type == InterpreterResult.Type.TEXT) {
+          flushListener.onAppend(this, bytes);
+        }
+      }
+      buffer.reset();
+    }
+  }
+
+  private void copyStream(InputStream in, OutputStream out) throws IOException {
+    int bufferSize = 8192;
+    byte[] buffer = new byte[bufferSize];
+
+    while (true) {
+      int bytesRead = in.read(buffer);
+      if (bytesRead == -1) {
+        break;
+      } else {
+        out.write(buffer, 0, bytesRead);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+
+    if (watcher != null) {
+      watcher.clear();
+      watcher.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
new file mode 100644
index 0000000..a639e0c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import java.io.File;
+
+/**
+ * InterpreterOutputChangeListener
+ */
+public interface InterpreterOutputChangeListener {
+  public void fileChanged(File file);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
new file mode 100644
index 0000000..5fe8237
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Watch the change for the development mode support
+ */
+public class InterpreterOutputChangeWatcher extends Thread {
+  Logger logger = LoggerFactory.getLogger(InterpreterOutputChangeWatcher.class);
+
+  private WatchService watcher;
+  private final List<File> watchFiles = new LinkedList<File>();
+  private final Map<WatchKey, File> watchKeys = new HashMap<WatchKey, File>();
+  private InterpreterOutputChangeListener listener;
+  private boolean stop;
+
+  public InterpreterOutputChangeWatcher(InterpreterOutputChangeListener listener)
+      throws IOException {
+    watcher = FileSystems.getDefault().newWatchService();
+    this.listener = listener;
+  }
+
+  public void watch(File file) throws IOException {
+    String dirString;
+    if (file.isFile()) {
+      dirString = file.getParentFile().getAbsolutePath();
+    } else {
+      throw new IOException(file.getName() + " is not a file");
+    }
+
+    if (dirString == null) {
+      dirString = "/";
+    }
+
+    Path dir = FileSystems.getDefault().getPath(dirString);
+    logger.info("watch " + dir);
+    WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
+    synchronized (watchKeys) {
+      watchKeys.put(key, new File(dirString));
+      watchFiles.add(file);
+    }
+  }
+
+  public void clear() {
+    synchronized (watchKeys) {
+      for (WatchKey key : watchKeys.keySet()) {
+        key.cancel();
+
+      }
+      watchKeys.clear();
+      watchFiles.clear();
+    }
+  }
+
+  public void shutdown() throws IOException {
+    stop = true;
+    clear();
+    watcher.close();
+  }
+
+  public void run() {
+    while (!stop) {
+      WatchKey key = null;
+      try {
+        key = watcher.poll(1, TimeUnit.SECONDS);
+      } catch (InterruptedException | ClosedWatchServiceException e) {
+        break;
+      }
+
+      if (key == null) {
+        continue;
+      }
+      for (WatchEvent<?> event : key.pollEvents()) {
+        WatchEvent.Kind<?> kind = event.kind();
+        if (kind == OVERFLOW) {
+          continue;
+        }
+        WatchEvent<Path> ev = (WatchEvent<Path>) event;
+        Path filename = ev.context();
+        // search for filename
+        synchronized (watchKeys) {
+          for (File f : watchFiles) {
+            if (f.getName().compareTo(filename.toString()) == 0) {
+              File changedFile;
+              if (filename.isAbsolute()) {
+                changedFile = new File(filename.toString());
+              } else {
+                changedFile = new File(watchKeys.get(key), filename.toString());
+              }
+              logger.info("File change detected " + changedFile.getAbsolutePath());
+              if (listener != null) {
+                listener.fileChanged(changedFile);
+              }
+            }
+          }
+        }
+      }
+
+      boolean valid = key.reset();
+      if (!valid) {
+        break;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
new file mode 100644
index 0000000..bdb262a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+/**
+ * Listen InterpreterOutput buffer flush
+ */
+public interface InterpreterOutputListener {
+  /**
+   * called when newline is detected
+   * @param line
+   */
+  public void onAppend(InterpreterOutput out, byte[] line);
+
+  /**
+   * when entire output is updated. eg) after detecting new display system
+   * @param output
+   */
+  public void onUpdate(InterpreterOutput out, byte[] output);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 593cfc7..d213796 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -146,4 +146,8 @@ public class InterpreterResult implements Serializable {
     this.type = type;
     return this;
   }
+
+  public String toString() {
+    return "%" + type.name().toLowerCase() + " " + msg;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 455156c..d2a24e8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -48,6 +48,7 @@ import com.google.gson.reflect.TypeToken;
  *
  */
 public class RemoteInterpreter extends Interpreter {
+  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
   Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
   Gson gson = new Gson();
   private String interpreterRunner;
@@ -60,32 +61,35 @@ public class RemoteInterpreter extends Interpreter {
   private int connectTimeout;
 
   public RemoteInterpreter(Properties property,
-      String className,
-      String interpreterRunner,
-      String interpreterPath,
-      int connectTimeout) {
+                           String className,
+                           String interpreterRunner,
+                           String interpreterPath,
+                           int connectTimeout,
+                           RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
     super(property);
-
     this.className = className;
     initialized = false;
     this.interpreterRunner = interpreterRunner;
     this.interpreterPath = interpreterPath;
     env = new HashMap<String, String>();
     this.connectTimeout = connectTimeout;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
   }
 
   public RemoteInterpreter(Properties property,
-      String className,
-      String interpreterRunner,
-      String interpreterPath,
-      Map<String, String> env,
-      int connectTimeout) {
+                           String className,
+                           String interpreterRunner,
+                           String interpreterPath,
+                           Map<String, String> env,
+                           int connectTimeout,
+                           RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
     super(property);
     this.className = className;
     this.interpreterRunner = interpreterRunner;
     this.interpreterPath = interpreterPath;
     this.env = env;
     this.connectTimeout = connectTimeout;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
   }
 
   @Override
@@ -103,7 +107,8 @@ public class RemoteInterpreter extends Interpreter {
       if (intpGroup.getRemoteInterpreterProcess() == null) {
         // create new remote process
         RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
-                interpreterRunner, interpreterPath, env, connectTimeout);
+                interpreterRunner, interpreterPath, env, connectTimeout,
+                remoteInterpreterProcessListener);
 
         intpGroup.setRemoteInterpreterProcess(remoteProcess);
       }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index c39e0fe..6186205 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -18,29 +18,35 @@
 package org.apache.zeppelin.interpreter.remote;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.thrift.TException;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 /**
  *
  */
 public class RemoteInterpreterEventPoller extends Thread {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+  private final RemoteInterpreterProcessListener listener;
 
   private volatile boolean shutdown;
 
   private RemoteInterpreterProcess interpreterProcess;
   private InterpreterGroup interpreterGroup;
 
-  public RemoteInterpreterEventPoller() {
+  public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) {
+    this.listener = listener;
     shutdown = false;
   }
 
@@ -110,6 +116,24 @@ public class RemoteInterpreterEventPoller extends Thread {
 
           interpreterProcess.getInterpreterContextRunnerPool().run(
               runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
+          // on output append
+          Map<String, String> outputAppend = gson.fromJson(
+                  event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+          String noteId = outputAppend.get("noteId");
+          String paragraphId = outputAppend.get("paragraphId");
+          String outputToAppend = outputAppend.get("data");
+
+          listener.onOutputAppend(noteId, paragraphId, outputToAppend);
+        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
+          // on output update
+          Map<String, String> outputAppend = gson.fromJson(
+                  event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+          String noteId = outputAppend.get("noteId");
+          String paragraphId = outputAppend.get("paragraphId");
+          String outputToUpdate = outputAppend.get("data");
+
+          listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
         }
         logger.debug("Event from remoteproceess {}", event.getType());
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 2c195dc..56b5485 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -53,10 +53,11 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
   private int connectTimeout;
 
   public RemoteInterpreterProcess(String intpRunner,
-      String intpDir,
-      Map<String, String> env,
-      int connectTimeout) {
-    this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(), connectTimeout);
+                                  String intpDir,
+                                  Map<String, String> env,
+                                  int connectTimeout,
+                                  RemoteInterpreterProcessListener listener) {
+    this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(listener), connectTimeout);
   }
 
   RemoteInterpreterProcess(String intpRunner,

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
new file mode 100644
index 0000000..da6ac63
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+/**
+ * Event from remoteInterpreterProcess
+ */
+public interface RemoteInterpreterProcessListener {
+  public void onOutputAppend(String noteId, String paragraphId, String output);
+  public void onOutputUpdated(String noteId, String paragraphId, String output);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index a8da8c0..728d210 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -35,15 +35,8 @@ import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
@@ -300,7 +293,26 @@ public class RemoteInterpreterServer
       try {
         InterpreterContext.set(context);
         InterpreterResult result = interpreter.interpret(script, context);
-        return result;
+
+        // data from context.out is prepended to InterpreterResult if both defined
+        String message = "";
+
+        context.out.flush();
+        InterpreterResult.Type outputType = context.out.getType();
+        byte[] interpreterOutput = context.out.toByteArray();
+        context.out.clear();
+
+        if (interpreterOutput != null && interpreterOutput.length > 0) {
+          message = new String(interpreterOutput);
+        }
+
+        String interpreterResultMessage = result.message();
+        if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
+          message += interpreterResultMessage;
+          return new InterpreterResult(result.code(), result.type(), message);
+        } else {
+          return new InterpreterResult(result.code(), outputType, message);
+        }
       } finally {
         InterpreterContext.remove();
       }
@@ -351,7 +363,8 @@ public class RemoteInterpreterServer
   private InterpreterContext convert(RemoteInterpreterContext ric) {
     List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
     List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
-        new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType());
+            new TypeToken<List<RemoteInterpreterContextRunner>>() {
+        }.getType());
 
     for (InterpreterContextRunner r : runners) {
       contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
@@ -366,7 +379,40 @@ public class RemoteInterpreterServer
             new TypeToken<Map<String, Object>>() {}.getType()),
         gson.fromJson(ric.getGui(), GUI.class),
         interpreterGroup.getAngularObjectRegistry(),
-        contextRunners);
+        contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
+  }
+
+
+  private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) {
+    return new InterpreterOutput(new InterpreterOutputListener() {
+      @Override
+      public void onAppend(InterpreterOutput out, byte[] line) {
+        Map<String, String> appendOutput = new HashMap<String, String>();
+        appendOutput.put("noteId", noteId);
+        appendOutput.put("paragraphId", paragraphId);
+        appendOutput.put("data", new String(line));
+
+        Gson gson = new Gson();
+
+        sendEvent(new RemoteInterpreterEvent(
+                RemoteInterpreterEventType.OUTPUT_APPEND,
+                gson.toJson(appendOutput)));
+      }
+
+      @Override
+      public void onUpdate(InterpreterOutput out, byte[] output) {
+        Map<String, String> appendOutput = new HashMap<String, String>();
+        appendOutput.put("noteId", noteId);
+        appendOutput.put("paragraphId", paragraphId);
+        appendOutput.put("data", new String(output));
+
+        Gson gson = new Gson();
+
+        sendEvent(new RemoteInterpreterEvent(
+                RemoteInterpreterEventType.OUTPUT_UPDATE,
+                gson.toJson(appendOutput)));
+      }
+    });
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index a55d5de..175f482 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
 public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
index 96a49b5..79203fb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
 public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
index 9a7d142..d650318 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -33,7 +33,9 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
   ANGULAR_OBJECT_ADD(2),
   ANGULAR_OBJECT_UPDATE(3),
   ANGULAR_OBJECT_REMOVE(4),
-  RUN_INTERPRETER_CONTEXT_RUNNER(5);
+  RUN_INTERPRETER_CONTEXT_RUNNER(5),
+  OUTPUT_APPEND(6),
+  OUTPUT_UPDATE(7);
 
   private final int value;
 
@@ -64,6 +66,10 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
         return ANGULAR_OBJECT_REMOVE;
       case 5:
         return RUN_INTERPRETER_CONTEXT_RUNNER;
+      case 6:
+        return OUTPUT_APPEND;
+      case 7:
+        return OUTPUT_UPDATE;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
index 36c0f25..cc50f9c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
 public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index 6e6730e..738b453 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
 public class RemoteInterpreterService {
 
   public interface Iface {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 144784c..65fd0a7 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -42,7 +42,9 @@ enum RemoteInterpreterEventType {
   ANGULAR_OBJECT_ADD = 2,
   ANGULAR_OBJECT_UPDATE = 3,
   ANGULAR_OBJECT_REMOVE = 4,
-  RUN_INTERPRETER_CONTEXT_RUNNER = 5
+  RUN_INTERPRETER_CONTEXT_RUNNER = 5,
+  OUTPUT_APPEND = 6,
+  OUTPUT_UPDATE = 7
 }
 
 struct RemoteInterpreterEvent {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
index 080bdaa..9c2732d 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
@@ -27,7 +27,7 @@ public class InterpreterContextTest {
   public void testThreadLocal() {
     assertNull(InterpreterContext.get());
 
-    InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null));
+    InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null));
     assertNotNull(InterpreterContext.get());
 
     InterpreterContext.remove();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
new file mode 100644
index 0000000..e376809
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener {
+  private File tmpDir;
+  private File fileChanged;
+  private int numChanged;
+  private InterpreterOutputChangeWatcher watcher;
+
+  @Before
+  public void setUp() throws Exception {
+    watcher = new InterpreterOutputChangeWatcher(this);
+    watcher.start();
+
+    tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+    tmpDir.mkdirs();
+    fileChanged = null;
+    numChanged = 0;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    watcher.shutdown();
+    delete(tmpDir);
+  }
+
+  private void delete(File file){
+    if(file.isFile()) file.delete();
+    else if(file.isDirectory()){
+      File [] files = file.listFiles();
+      if(files!=null && files.length>0){
+        for(File f : files){
+          delete(f);
+        }
+      }
+      file.delete();
+    }
+  }
+
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    assertNull(fileChanged);
+    assertEquals(0, numChanged);
+
+    Thread.sleep(1000);
+    // create new file
+    File file1 = new File(tmpDir, "test1");
+    file1.createNewFile();
+
+    File file2 = new File(tmpDir, "test2");
+    file2.createNewFile();
+
+    watcher.watch(file1);
+    Thread.sleep(1000);
+
+    FileOutputStream out1 = new FileOutputStream(file1);
+    out1.write(1);
+    out1.close();
+
+    FileOutputStream out2 = new FileOutputStream(file2);
+    out2.write(1);
+    out2.close();
+
+    synchronized (this) {
+      wait(30*1000);
+    }
+
+    assertNotNull(fileChanged);
+    assertEquals(1, numChanged);
+  }
+
+
+  @Override
+  public void fileChanged(File file) {
+    fileChanged = file;
+    numChanged++;
+
+    synchronized(this) {
+      notify();
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
new file mode 100644
index 0000000..f8f4809
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class InterpreterOutputTest implements InterpreterOutputListener {
+  private InterpreterOutput out;
+  int numAppendEvent;
+  int numUpdateEvent;
+
+  @Before
+  public void setUp() {
+    out = new InterpreterOutput(this);
+    numAppendEvent = 0;
+    numUpdateEvent = 0;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    out.close();
+  }
+
+  @Test
+  public void testDetectNewline() throws IOException {
+    out.write("hello\nworld");
+    assertEquals("hello\n", new String(out.toByteArray()));
+    assertEquals(1, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+
+    out.write("\n");
+    assertEquals("hello\nworld\n", new String(out.toByteArray()));
+    assertEquals(2, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+  }
+
+  @Test
+  public void testFlush() throws IOException {
+    out.write("hello\nworld");
+    assertEquals("hello\n", new String(out.toByteArray()));
+    assertEquals(1, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+
+    out.flush();
+    assertEquals("hello\nworld", new String(out.toByteArray()));
+    assertEquals(2, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+
+    out.clear();
+    out.write("%html div");
+    assertEquals("", new String(out.toByteArray()));
+    assertEquals(InterpreterResult.Type.TEXT, out.getType());
+
+    out.flush();
+    out.write("%html div");
+    assertEquals("div", new String(out.toByteArray()));
+    assertEquals(InterpreterResult.Type.HTML, out.getType());
+  }
+
+  @Test
+  public void testType() throws IOException {
+    // default output stream type is TEXT
+    out.write("Text\n");
+    assertEquals(InterpreterResult.Type.TEXT, out.getType());
+    assertEquals("Text\n", new String(out.toByteArray()));
+    assertEquals(1, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+
+    // change type
+    out.write("%html\n");
+    assertEquals(InterpreterResult.Type.HTML, out.getType());
+    assertEquals("", new String(out.toByteArray()));
+    assertEquals(1, numAppendEvent);
+    assertEquals(2, numUpdateEvent);
+
+    // none TEXT type output stream does not generate append event
+    out.write("<div>html</div>\n");
+    assertEquals(InterpreterResult.Type.HTML, out.getType());
+    assertEquals(1, numAppendEvent);
+    assertEquals(2, numUpdateEvent);
+    assertEquals("<div>html</div>\n", new String(out.toByteArray()));
+
+    // change type to text again
+    out.write("%text hello\n");
+    assertEquals(InterpreterResult.Type.TEXT, out.getType());
+    assertEquals(2, numAppendEvent);
+    assertEquals(3, numUpdateEvent);
+    assertEquals("hello\n", new String(out.toByteArray()));
+  }
+
+  @Test
+  public void testType2() throws IOException {
+    out.write("%html\nHello");
+    assertEquals(InterpreterResult.Type.HTML, out.getType());
+  }
+
+  @Override
+  public void onAppend(InterpreterOutput out, byte[] line) {
+    numAppendEvent++;
+  }
+
+  @Override
+  public void onUpdate(InterpreterOutput out, byte[] output) {
+    numUpdateEvent++;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
index 007730a..d7ab9e8 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
@@ -105,4 +105,9 @@ public class InterpreterResultTest {
 					"123\n", result.message());
 	  }
 
+	  @Test
+	  public void testToString() {
+			assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString());
+		}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
index 29a1fb1..906878d 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -64,12 +64,13 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
     Properties p = new Properties();
 
     intp = new RemoteInterpreter(
-        p,
-        MockInterpreterAngular.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterAngular.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intp);
@@ -83,7 +84,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
         new HashMap<String, Object>(),
         new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+        new LinkedList<InterpreterContextRunner>(), null);
 
     intp.open();
   }