You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/01/21 00:36:54 UTC

[1/2] incubator-zeppelin git commit: [ZEPPELIN-554] Streaming interpreter output to front-end

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master dbdaf84e4 -> 5ec59a81b


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
new file mode 100644
index 0000000..623a037
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test for remote interpreter output stream
+ */
+public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
+  private InterpreterGroup intpGroup;
+  private HashMap<String, String> env;
+
+  @Before
+  public void setUp() throws Exception {
+    intpGroup = new InterpreterGroup();
+    env = new HashMap<String, String>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    intpGroup.close();
+    intpGroup.destroy();
+  }
+
+  private RemoteInterpreter createMockInterpreter() {
+    RemoteInterpreter intp = new RemoteInterpreter(
+            new Properties(),
+            MockInterpreterOutputStream.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            this);
+
+
+    intpGroup.add(intp);
+    intp.setInterpreterGroup(intpGroup);
+    return intp;
+  }
+
+  private InterpreterContext createInterpreterContext() {
+    return new InterpreterContext(
+            "noteId",
+            "id",
+            "title",
+            "text",
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LinkedList<InterpreterContextRunner>(), null);
+  }
+
+  @Test
+  public void testInterpreterResultOnly() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("staticresult", ret.message());
+
+    ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("staticresult2", ret.message());
+
+    ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals("staticresult3", ret.message());
+  }
+
+  @Test
+  public void testInterpreterOutputStreamOnly() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("streamresult", ret.message());
+
+    ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals("streamresult2", ret.message());
+  }
+
+  @Test
+  public void testInterpreterResultOutputStreamMixed() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("streamstatic", ret.message());
+  }
+
+  @Test
+  public void testOutputType() {
+    RemoteInterpreter intp = createMockInterpreter();
+
+    InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.type());
+    assertEquals("hello", ret.message());
+
+    ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.type());
+    assertEquals("hello", ret.message());
+
+    ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.ANGULAR, ret.type());
+    assertEquals("helloworld", ret.message());
+  }
+
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, String output) {
+
+  }
+
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, String output) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
index ea5397e..abee5b8 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -34,7 +34,7 @@ public class RemoteInterpreterProcessTest {
     InterpreterGroup intpGroup = new InterpreterGroup();
     RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
         "../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
-        10 * 1000);
+        10 * 1000, null);
     assertFalse(rip.isRunning());
     assertEquals(0, rip.referenceCount());
     assertEquals(1, rip.reference(intpGroup));

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index c938ff3..034a676 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -63,30 +63,38 @@ public class RemoteInterpreterTest {
     intpGroup.destroy();
   }
 
+  private RemoteInterpreter createMockInterpreterA(Properties p) {
+    return new RemoteInterpreter(
+            p,
+            MockInterpreterA.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null);
+  }
+
+  private RemoteInterpreter createMockInterpreterB(Properties p) {
+    return new RemoteInterpreter(
+            p,
+            MockInterpreterB.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null);
+  }
+
   @Test
   public void testRemoteInterperterCall() throws TTransportException, IOException {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
-    RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpB = createMockInterpreterB(p);
 
     intpGroup.add(intpB);
     intpB.setInterpreterGroup(intpGroup);
@@ -113,7 +121,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
 
     intpB.open();
     assertEquals(2, process.referenceCount());
@@ -131,14 +139,7 @@ public class RemoteInterpreterTest {
   public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -153,7 +154,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
 
     assertEquals(Code.ERROR, ret.code());
   }
@@ -163,24 +164,26 @@ public class RemoteInterpreterTest {
     Properties p = new Properties();
 
     RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterA.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterB.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intpB);
@@ -199,7 +202,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
     assertEquals("500", ret.message());
 
     ret = intpB.interpret("500",
@@ -211,7 +214,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
     assertEquals("1000", ret.message());
     long end = System.currentTimeMillis();
     assertTrue(end - start >= 1000);
@@ -225,26 +228,12 @@ public class RemoteInterpreterTest {
   public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
     Properties p = new Properties();
 
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
-    final RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpB = createMockInterpreterB(p);
 
     intpGroup.add(intpB);
     intpB.setInterpreterGroup(intpGroup);
@@ -276,7 +265,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
-                new LinkedList<InterpreterContextRunner>()));
+                new LinkedList<InterpreterContextRunner>(), null));
       }
 
       @Override
@@ -310,7 +299,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
-                new LinkedList<InterpreterContextRunner>()));
+                new LinkedList<InterpreterContextRunner>(), null));
       }
 
       @Override
@@ -340,14 +329,7 @@ public class RemoteInterpreterTest {
   public void testRunOrderPreserved() throws InterruptedException {
     Properties p = new Properties();
 
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -382,7 +364,7 @@ public class RemoteInterpreterTest {
               new HashMap<String, Object>(),
               new GUI(),
               new AngularObjectRegistry(intpGroup.getId(), null),
-              new LinkedList<InterpreterContextRunner>()));
+              new LinkedList<InterpreterContextRunner>(), null));
 
           synchronized (results) {
             results.add(ret.message());
@@ -421,14 +403,7 @@ public class RemoteInterpreterTest {
     Properties p = new Properties();
     p.put("parallel", "true");
 
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -466,7 +441,7 @@ public class RemoteInterpreterTest {
               new HashMap<String, Object>(),
               new GUI(),
               new AngularObjectRegistry(intpGroup.getId(), null),
-              new LinkedList<InterpreterContextRunner>()));
+              new LinkedList<InterpreterContextRunner>(), null));
 
           synchronized (results) {
             results.add(ret.message());
@@ -501,14 +476,7 @@ public class RemoteInterpreterTest {
   public void testInterpreterGroupResetBeforeProcessStarts() {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpA.setInterpreterGroup(intpGroup);
     RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
@@ -523,14 +491,7 @@ public class RemoteInterpreterTest {
   public void testInterpreterGroupResetAfterProcessFinished() {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpA.setInterpreterGroup(intpGroup);
     RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
@@ -548,14 +509,7 @@ public class RemoteInterpreterTest {
   public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException {
     Properties p = new Properties();
 
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -585,7 +539,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
-                new LinkedList<InterpreterContextRunner>()));
+                new LinkedList<InterpreterContextRunner>(), null));
       }
 
       @Override
@@ -616,26 +570,12 @@ public class RemoteInterpreterTest {
   public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpA = createMockInterpreterA(p);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
-    RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
-        );
+    RemoteInterpreter intpB = createMockInterpreterB(p);
 
     intpGroup.add(intpB);
     intpB.setInterpreterGroup(intpGroup);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
new file mode 100644
index 0000000..bc1859f
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MockInterpreter to test outputstream
+ */
+public class MockInterpreterOutputStream extends Interpreter {
+  static {
+    Interpreter.register(
+            "interpreterOutputStream",
+            "group1",
+            MockInterpreterA.class.getName(),
+            new InterpreterPropertyBuilder().build());
+
+  }
+
+  private String lastSt;
+
+  public MockInterpreterOutputStream(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    //new RuntimeException().printStackTrace();
+  }
+
+  @Override
+  public void close() {
+  }
+
+  public String getLastStatement() {
+    return lastSt;
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] ret = st.split(":");
+    try {
+      if (ret[1] != null) {
+        context.out.write(ret[1]);
+      }
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
+    return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ?
+            ret[2] : "");
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index d17df4f..05bc676 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -64,12 +64,13 @@ public class RemoteSchedulerTest {
     env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
 
     final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterA.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intpA);
@@ -103,7 +104,7 @@ public class RemoteSchedulerTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>()));
+            new LinkedList<InterpreterContextRunner>(), null));
         return "1000";
       }
 
@@ -147,12 +148,13 @@ public class RemoteSchedulerTest {
     env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
 
     final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterA.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intpA);
@@ -173,7 +175,7 @@ public class RemoteSchedulerTest {
           new HashMap<String, Object>(),
           new GUI(),
           new AngularObjectRegistry(intpGroup.getId(), null),
-          new LinkedList<InterpreterContextRunner>());
+          new LinkedList<InterpreterContextRunner>(), null);
 
       @Override
       public int progress() {
@@ -209,7 +211,7 @@ public class RemoteSchedulerTest {
           new HashMap<String, Object>(),
           new GUI(),
           new AngularObjectRegistry(intpGroup.getId(), null),
-          new LinkedList<InterpreterContextRunner>());
+          new LinkedList<InterpreterContextRunner>(), null);
 
       @Override
       public int progress() {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 9e7a97c..dff75c7 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -81,7 +81,8 @@ public class ZeppelinServer extends Application {
 
     this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO));
     this.schedulerFactory = new SchedulerFactory();
-    this.replFactory = new InterpreterFactory(conf, notebookWsServer, depResolver);
+    this.replFactory = new InterpreterFactory(conf, notebookWsServer,
+            notebookWsServer, depResolver);
     this.notebookRepo = new NotebookRepoSync(conf);
     this.notebookIndex = new LuceneSearch();
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
index 0142df2..4296e93 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
@@ -93,6 +93,8 @@ public class Message {
 
     PARAGRAPH_REMOVE,
     PARAGRAPH_CLEAR_OUTPUT,
+    PARAGRAPH_APPEND_OUTPUT,  // [s-c] append output
+    PARAGRAPH_UPDATE_OUTPUT,  // [s-c] update (replace) output
     PING,
 
     ANGULAR_OBJECT_UPDATE,  // [s-c] add/update angular object

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 3dfdca3..64698fc 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -30,12 +30,11 @@ import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.notebook.*;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -57,7 +56,8 @@ import com.google.gson.Gson;
  *
  */
 public class NotebookServer extends WebSocketServlet implements
-        NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener {
+        NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener,
+        RemoteInterpreterProcessListener {
   private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
   Gson gson = new Gson();
   final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
@@ -749,14 +749,46 @@ public class NotebookServer extends WebSocketServlet implements
   }
 
   /**
+   * This callback is for the paragraph that runs on ZeppelinServer
+   * @param noteId
+   * @param paragraphId
+   * @param output output to append
+   */
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, String output) {
+    Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
+            .put("noteId", noteId)
+            .put("paragraphId", paragraphId)
+            .put("data", output);
+    Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
+    broadcast(noteId, msg);
+  }
+
+  /**
+   * This callback is for the paragraph that runs on ZeppelinServer
+   * @param noteId
+   * @param paragraphId
+   * @param output output to update (replace)
+   */
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, String output) {
+    Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
+            .put("noteId", noteId)
+            .put("paragraphId", paragraphId)
+            .put("data", output);
+    Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
+    broadcast(noteId, msg);
+  }
+
+  /**
    * Need description here.
    *
    */
-  public static class ParagraphJobListener implements JobListener {
+  public static class ParagraphListenerImpl implements ParagraphJobListener {
     private NotebookServer notebookServer;
     private Note note;
 
-    public ParagraphJobListener(NotebookServer notebookServer, Note note) {
+    public ParagraphListenerImpl(NotebookServer notebookServer, Note note) {
       this.notebookServer = notebookServer;
       this.note = note;
     }
@@ -791,11 +823,43 @@ public class NotebookServer extends WebSocketServlet implements
       }
       notebookServer.broadcastNote(note);
     }
+
+    /**
+     * This callback is for praragraph that runs on RemoteInterpreterProcess
+     * @param paragraph
+     * @param out
+     * @param output
+     */
+    @Override
+    public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
+      Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
+              .put("noteId", paragraph.getNote().getId())
+              .put("paragraphId", paragraph.getId())
+              .put("data", output);
+
+      notebookServer.broadcast(paragraph.getNote().getId(), msg);
+    }
+
+    /**
+     * This callback is for paragraph that runs on RemoteInterpreterProcess
+     * @param paragraph
+     * @param out
+     * @param output
+     */
+    @Override
+    public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
+      Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
+              .put("noteId", paragraph.getNote().getId())
+              .put("paragraphId", paragraph.getId())
+              .put("data", output);
+
+      notebookServer.broadcast(paragraph.getNote().getId(), msg);
+    }
   }
 
   @Override
-  public JobListener getParagraphJobListener(Note note) {
-    return new ParagraphJobListener(this, note);
+  public ParagraphJobListener getParagraphJobListener(Note note) {
+    return new ParagraphListenerImpl(this, note);
   }
 
   private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
index 80af4ef..7fb40ac 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
@@ -23,24 +23,22 @@ limitations under the License.
        ng-bind-html="paragraph.result.comment">
   </div>
 
-  <div id="{{paragraph.id}}_text"
+  <div id="p{{paragraph.id}}_text"
        class="text"
-       ng-if="paragraph.result.type == 'TEXT'"
-       ng-bind="paragraph.result.msg">
-  </div>
+       ng-if="getResultType() == 'TEXT'"></div>
 
   <div id="p{{paragraph.id}}_html"
        class="resultContained"
-       ng-if="paragraph.result.type == 'HTML'">
+       ng-if="getResultType() == 'HTML'">
   </div>
 
   <div id="p{{paragraph.id}}_angular"
        class="resultContained"
-       ng-if="paragraph.result.type == 'ANGULAR'">
+       ng-if="getResultType() == 'ANGULAR'">
   </div>
 
   <img id="{{paragraph.id}}_img"
-       ng-if="paragraph.result.type == 'IMG'"
+       ng-if="getResultType() == 'IMG'"
        ng-src="{{getBase64ImageSrc(paragraph.result.msg)}}">
   </img>
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
index be88498..30c7ea4 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
@@ -54,11 +54,13 @@ angular.module('zeppelinWebApp')
       $scope.renderHtml();
     } else if ($scope.getResultType() === 'ANGULAR') {
       $scope.renderAngular();
+    } else if ($scope.getResultType() === 'TEXT') {
+      $scope.renderText();
     }
   };
 
-  $scope.renderHtml = function() {
-    var retryRenderer = function() {
+    $scope.renderHtml = function() {
+      var retryRenderer = function() {
       if (angular.element('#p' + $scope.paragraph.id + '_html').length) {
         try {
           angular.element('#p' + $scope.paragraph.id + '_html').html($scope.paragraph.result.msg);
@@ -93,6 +95,42 @@ angular.module('zeppelinWebApp')
     $timeout(retryRenderer);
   };
 
+  $scope.renderText = function() {
+    var retryRenderer = function() {
+
+      var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
+      if (textEl.length) {
+        // clear all lines before render
+        $scope.clearTextOutput();
+
+        if ($scope.paragraph.result && $scope.paragraph.result.msg) {
+          $scope.appendTextOutput($scope.paragraph.result.msg);
+        }
+      } else {
+        $timeout(retryRenderer, 10);
+      }
+    };
+    $timeout(retryRenderer);
+  };
+
+  $scope.clearTextOutput = function() {
+    var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
+    if (textEl.length) {
+      textEl.children().remove();
+    }
+  };
+
+  $scope.appendTextOutput = function(msg) {
+    var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
+    if (textEl.length) {
+      var lines = msg.split('\n');
+      for (var i=0; i < lines.length; i++) {
+        textEl.append(angular.element('<div></div>').text(lines[i]));
+      }
+    }
+  };
+
+
 
   var initializeDefault = function() {
     var config = $scope.paragraph.config;
@@ -156,6 +194,10 @@ angular.module('zeppelinWebApp')
     }
   });
 
+  var isEmpty = function (object) {
+    return !object;
+  };
+
   // TODO: this may have impact on performance when there are many paragraphs in a note.
   $scope.$on('updateParagraph', function(event, data) {
     if (data.paragraph.id === $scope.paragraph.id &&
@@ -166,6 +208,7 @@ angular.module('zeppelinWebApp')
          data.paragraph.status !== $scope.paragraph.status ||
          data.paragraph.jobName !== $scope.paragraph.jobName ||
          data.paragraph.title !== $scope.paragraph.title ||
+         isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result) ||
          data.paragraph.errorMessage !== $scope.paragraph.errorMessage ||
          !angular.equals(data.paragraph.settings, $scope.paragraph.settings) ||
          !angular.equals(data.paragraph.config, $scope.paragraph.config))
@@ -175,7 +218,8 @@ angular.module('zeppelinWebApp')
       var newType = $scope.getResultType(data.paragraph);
       var oldGraphMode = $scope.getGraphMode();
       var newGraphMode = $scope.getGraphMode(data.paragraph);
-      var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished);
+      var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished) || isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result);
+
       var statusChanged = (data.paragraph.status !== $scope.paragraph.status);
 
       //console.log("updateParagraph oldData %o, newData %o. type %o -> %o, mode %o -> %o", $scope.paragraph, data, oldType, newType, oldGraphMode, newGraphMode);
@@ -234,6 +278,8 @@ angular.module('zeppelinWebApp')
         $scope.renderHtml();
       } else if (newType === 'ANGULAR' && resultRefreshed) {
         $scope.renderAngular();
+      } else if (newType === 'TEXT' && resultRefreshed) {
+        $scope.renderText();
       }
 
       if (statusChanged || resultRefreshed) {
@@ -252,6 +298,19 @@ angular.module('zeppelinWebApp')
 
   });
 
+  $scope.$on('appendParagraphOutput', function(event, data) {
+    if ($scope.paragraph.id === data.paragraphId) {
+      $scope.appendTextOutput(data.data);
+    }
+  });
+
+  $scope.$on('updateParagraphOutput', function(event, data) {
+    if ($scope.paragraph.id === data.paragraphId) {
+      $scope.clearTextOutput(data.data);
+      $scope.appendTextOutput(data.data);
+    }
+  });
+
   $scope.isRunning = function() {
     if ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING') {
       return true;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
index bb99d56..800d450 100644
--- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
+++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
@@ -54,6 +54,10 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope,
       $rootScope.$broadcast('setNoteMenu', data.notes);
     } else if (op === 'PARAGRAPH') {
       $rootScope.$broadcast('updateParagraph', data);
+    } else if (op === 'PARAGRAPH_APPEND_OUTPUT') {
+      $rootScope.$broadcast('appendParagraphOutput', data);
+    } else if (op === 'PARAGRAPH_UPDATE_OUTPUT') {
+      $rootScope.$broadcast('updateParagraphOutput', data);      
     } else if (op === 'PROGRESS') {
       $rootScope.$broadcast('updateProgress', data);
     } else if (op === 'COMPLETION_LIST') {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 4ff0cc3..039d970 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -29,6 +29,7 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.slf4j.Logger;
@@ -65,25 +66,30 @@ public class InterpreterFactory {
   private InterpreterOption defaultOption;
 
   AngularObjectRegistryListener angularObjectRegistryListener;
+  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
 
   DependencyResolver depResolver;
 
   public InterpreterFactory(ZeppelinConfiguration conf,
       AngularObjectRegistryListener angularObjectRegistryListener,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
       DependencyResolver depResolver)
       throws InterpreterException, IOException {
-    this(conf, new InterpreterOption(true), angularObjectRegistryListener, depResolver);
+    this(conf, new InterpreterOption(true), angularObjectRegistryListener,
+            remoteInterpreterProcessListener, depResolver);
   }
 
 
   public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
       AngularObjectRegistryListener angularObjectRegistryListener,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
       DependencyResolver depResolver)
       throws InterpreterException, IOException {
     this.conf = conf;
     this.defaultOption = defaultOption;
     this.angularObjectRegistryListener = angularObjectRegistryListener;
     this.depResolver = depResolver;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
     String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
     interpreterClassList = replsConf.split(",");
 
@@ -500,7 +506,8 @@ public class InterpreterFactory {
 
   /**
    * Change interpreter property and restart
-   * @param name
+   * @param id
+   * @param option
    * @param properties
    * @throws IOException
    */
@@ -659,7 +666,7 @@ public class InterpreterFactory {
     int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
     LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
         property, className, conf.getInterpreterRemoteRunnerPath(),
-        interpreterPath, connectTimeout));
+        interpreterPath, connectTimeout, remoteInterpreterProcessListener));
     return intp;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
index 5a7e966..1387730 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
@@ -17,11 +17,9 @@
 
 package org.apache.zeppelin.notebook;
 
-import org.apache.zeppelin.scheduler.JobListener;
-
 /**
  * TODO(moon): provide description.
  */
 public interface JobListenerFactory {
-  public JobListener getParagraphJobListener(Note note);
+  public ParagraphJobListener getParagraphJobListener(Note note);
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 392b968..10f080d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -19,39 +19,43 @@ package org.apache.zeppelin.notebook;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.utility.IdHashes;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
 import org.apache.zeppelin.search.SearchService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Binded interpreters for a note
  */
 public class Note implements Serializable, JobListener {
+  static Logger logger = LoggerFactory.getLogger(Note.class);
   private static final long serialVersionUID = 7920699076577612429L;
 
+  // threadpool for delayed persist of note
+  private static final ScheduledThreadPoolExecutor delayedPersistThreadPool =
+          new ScheduledThreadPoolExecutor(0);
+  static {
+    delayedPersistThreadPool.setRemoveOnCancelPolicy(true);
+  }
+
   final List<Paragraph> paragraphs = new LinkedList<>();
+
   private String name = "";
   private String id;
 
@@ -62,6 +66,7 @@ public class Note implements Serializable, JobListener {
   private transient JobListenerFactory jobListenerFactory;
   private transient NotebookRepo repo;
   private transient SearchService index;
+  private transient ScheduledFuture delayedPersist;
 
   /**
    * note configurations.
@@ -144,9 +149,8 @@ public class Note implements Serializable, JobListener {
 
   /**
    * Add paragraph last.
-   *
-   * @param p
    */
+
   public Paragraph addParagraph() {
     Paragraph p = new Paragraph(this, this, replLoader);
     synchronized (paragraphs) {
@@ -187,7 +191,6 @@ public class Note implements Serializable, JobListener {
    * Insert paragraph in given index.
    *
    * @param index
-   * @param p
    */
   public Paragraph insertParagraph(int index) {
     Paragraph p = new Paragraph(this, this, replLoader);
@@ -339,8 +342,6 @@ public class Note implements Serializable, JobListener {
 
   /**
    * Run all paragraphs sequentially.
-   *
-   * @param jobListener
    */
   public void runAll() {
     synchronized (paragraphs) {
@@ -400,15 +401,55 @@ public class Note implements Serializable, JobListener {
   }
 
   public void persist() throws IOException {
+    stopDelayedPersistTimer();
     snapshotAngularObjectRegistry();
     index.updateIndexDoc(this);
     repo.save(this);
   }
 
+  /**
+   * Persist this note with maximum delay.
+   * @param maxDelaySec
+   */
+  public void persist(int maxDelaySec) {
+    startDelayedPersistTimer(maxDelaySec);
+  }
+
   public void unpersist() throws IOException {
     repo.remove(id());
   }
 
+
+  private void startDelayedPersistTimer(int maxDelaySec) {
+    synchronized (this) {
+      if (delayedPersist != null) {
+        return;
+      }
+
+      delayedPersist = delayedPersistThreadPool.schedule(new Runnable() {
+
+        @Override
+        public void run() {
+          try {
+            persist();
+          } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+          }
+        }
+      }, maxDelaySec, TimeUnit.SECONDS);
+    }
+  }
+
+  private void stopDelayedPersistTimer() {
+    synchronized (this) {
+      if (delayedPersist == null) {
+        return;
+      }
+
+      delayedPersist.cancel(false);
+    }
+  }
+
   public Map<String, Object> getConfig() {
     if (config == null) {
       config = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 433095b..65210f5 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.scheduler.JobListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
 
@@ -213,7 +214,29 @@ public class Paragraph extends Job implements Serializable, Cloneable {
       if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
         return getReturn();
       }
-      return ret;
+
+      String message = "";
+
+      context.out.flush();
+      InterpreterResult.Type outputType = context.out.getType();
+      byte[] interpreterOutput = context.out.toByteArray();
+      context.out.clear();
+
+      if (interpreterOutput != null && interpreterOutput.length > 0) {
+        message = new String(interpreterOutput);
+      }
+
+      if (message.isEmpty()) {
+        return ret;
+      } else {
+        String interpreterResultMessage = ret.message();
+        if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
+          message += interpreterResultMessage;
+          return new InterpreterResult(ret.code(), ret.type(), message);
+        } else {
+          return new InterpreterResult(ret.code(), outputType, message);
+        }
+      }
     } finally {
       InterpreterContext.remove();
     }
@@ -244,6 +267,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
       runners.add(new ParagraphRunner(note, note.id(), p.getId()));
     }
 
+    final Paragraph self = this;
     InterpreterContext interpreterContext = new InterpreterContext(
             note.id(),
             getId(),
@@ -252,7 +276,34 @@ public class Paragraph extends Job implements Serializable, Cloneable {
             this.getConfig(),
             this.settings,
             registry,
-            runners);
+            runners,
+            new InterpreterOutput(new InterpreterOutputListener() {
+              @Override
+              public void onAppend(InterpreterOutput out, byte[] line) {
+                updateParagraphResult(out);
+                ((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line));
+              }
+
+              @Override
+              public void onUpdate(InterpreterOutput out, byte[] output) {
+                updateParagraphResult(out);
+                ((ParagraphJobListener) getListener()).onOutputUpdate(self, out,
+                        new String(output));
+              }
+
+              private void updateParagraphResult(InterpreterOutput out) {
+                // update paragraph result
+                Throwable t = null;
+                String message = null;
+                try {
+                  message = new String(out.toByteArray());
+                } catch (IOException e) {
+                  logger().error(e.getMessage(), e);
+                  t = e;
+                }
+                setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t);
+              }
+            }));
     return interpreterContext;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
new file mode 100644
index 0000000..f6404d7
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.scheduler.JobListener;
+
+/**
+ * Listen paragraph update
+ */
+public interface ParagraphJobListener extends JobListener {
+  public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output);
+  public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index abd0e3b..17d91cc 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -55,8 +55,8 @@ public class InterpreterFactoryTest {
     System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
     System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
     conf = new ZeppelinConfiguration();
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
-    context = new InterpreterContext("note", "id", "title", "text", null, null, null, null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
+    context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null);
 
   }
 
@@ -140,7 +140,7 @@ public class InterpreterFactoryTest {
     factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties());
     assertEquals(3, factory.get().size());
 
-    InterpreterFactory factory2 = new InterpreterFactory(conf, null, null);
+    InterpreterFactory factory2 = new InterpreterFactory(conf, null, null, null);
     assertEquals(3, factory2.get().size());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
index 9496e4d..4fa8ef6 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
@@ -58,7 +58,7 @@ public class NoteInterpreterLoaderTest {
     MockInterpreter11.register("mock11", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter11");
     MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
 
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index e98e680..82ba137 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -38,6 +38,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
@@ -85,7 +86,7 @@ public class NotebookTest implements JobListenerFactory{
     MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
     MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
 
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
 
     SearchService search = mock(SearchService.class);
     notebookRepo = new VFSNotebookRepo(conf);
@@ -172,7 +173,8 @@ public class NotebookTest implements JobListenerFactory{
     note.persist();
 
     Notebook notebook2 = new Notebook(
-        conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null), this, null);
+        conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null, null), this,
+            null);
     assertEquals(1, notebook2.getAllNotes().size());
   }
 
@@ -411,8 +413,16 @@ public class NotebookTest implements JobListenerFactory{
   }
 
   @Override
-  public JobListener getParagraphJobListener(Note note) {
-    return new JobListener(){
+  public ParagraphJobListener getParagraphJobListener(Note note) {
+    return new ParagraphJobListener(){
+
+      @Override
+      public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
+      }
+
+      @Override
+      public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
+      }
 
       @Override
       public void onProgressUpdate(Job job, int progress) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
index 60b3ba3..31970af 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
@@ -30,12 +30,10 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.*;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -87,7 +85,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
     MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
     MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
 
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
     
     SearchService search = mock(SearchService.class);
     notebookRepoSync = new NotebookRepoSync(conf);
@@ -224,8 +222,16 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
   }
 
   @Override
-  public JobListener getParagraphJobListener(Note note) {
-    return new JobListener(){
+  public ParagraphJobListener getParagraphJobListener(Note note) {
+    return new ParagraphJobListener(){
+
+      @Override
+      public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
+      }
+
+      @Override
+      public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
+      }
 
       @Override
       public void onProgressUpdate(Job job, int progress) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
index cff086d..2e2801c 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
@@ -30,10 +30,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.*;
 import org.apache.zeppelin.scheduler.JobListener;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.apache.zeppelin.search.SearchService;
@@ -76,7 +73,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
     MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
 
     this.schedulerFactory = new SchedulerFactory();
-    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
+    factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
 
     SearchService search = mock(SearchService.class);
     notebookRepo = new VFSNotebookRepo(conf);
@@ -140,7 +137,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
   }
 
   @Override
-  public JobListener getParagraphJobListener(Note note) {
+  public ParagraphJobListener getParagraphJobListener(Note note) {
     return null;
   }
 }


[2/2] incubator-zeppelin git commit: [ZEPPELIN-554] Streaming interpreter output to front-end

Posted by mo...@apache.org.
[ZEPPELIN-554] Streaming interpreter output to front-end

### What is this PR for?
Output from interpreter is displayed after completion of paragraph execution.
It'll be useful if output can be streamed to front-end during execution.

Previous work #593 injects InterpreterOutput stream object to Interpreter.
This PR is based on #593 and stream the data from InterpreterOutput to front-end.

This implementation only streams output is %text. Other output type (%html, %angular, %table) is not streamed to the front end.

While this PR keeps backward compatibility, Interpreter who want to use this feature will need to modify code to write output into `InterpreterOutput` instead of return with `InterpreterResult`.

This PR includes modification of SparkInterpreter to use InterpreterOutput.

### What type of PR is it?
Feature

### Todos

### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-554

### How should this be tested?
Run such code using Spark interpreter
```
(1 to 10).foreach{ i=>
    Thread.sleep(1000)
    println("Hello " + i)
}
```

### Screenshots (if appropriate)
![stream_output](https://cloud.githubusercontent.com/assets/1540981/12188677/6df08900-b56c-11e5-95d9-e5f6fad91007.gif)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Lee moon soo <mo...@apache.org>

Closes #611 from Leemoonsoo/output_stream_frontend and squashes the following commits:

53e2bb4 [Lee moon soo] Not persist on every append
dedae0d [Lee moon soo] Remove debug lines
8251fb4 [Lee moon soo] Fix syntax and style
9c9c8fd [Lee moon soo] update test
18215a3 [Lee moon soo] fix style
f7e6a4d [Lee moon soo] Fix syntax error
d29cfbf [Lee moon soo] workaround jshint
07b3e1a [Lee moon soo] Handle clear output correctly
bc6262e [Lee moon soo] Make PysparkInterpreter stream output
6d9cc51 [Lee moon soo] Pass InterpreterOutput to SparkILoop
b68180e [Lee moon soo] Add InterpreterOutput on spark interpreter unitest
626ad48 [Lee moon soo] Add license header
846015b [Lee moon soo] Update scalding
37d6920 [Lee moon soo] Handle display system directive correctly
e278e84 [Lee moon soo] Clear output correctly
479b836 [Lee moon soo] Add test
c01df62 [Lee moon soo] Connect Spark interpreter Console.out to outputstream
8a1223f [Lee moon soo] Handle update output correctly
e7a9b37 [Lee moon soo] Delayed persist
2060c1e [Lee moon soo] Clear before render text
786c978 [Lee moon soo] update paragraph object after witing to outputstream
258ff38 [Lee moon soo] Barely working
6f607f7 [Lee moon soo] Add newline listener
89d9798 [Lee moon soo] Render text output line by line
a42e4ff [Lee moon soo] Update test
fb5e7b5 [Lee moon soo] Update test
0f60b54 [Lee moon soo] Update test
a07d7db [Lee moon soo] Implement InterpreterResult.toString
1f419b6 [Lee moon soo] Add InterpreterOutput
c91f498 [Lee moon soo] prepend interpreteroutputstream to interpreter result


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/5ec59a81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/5ec59a81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/5ec59a81

Branch: refs/heads/master
Commit: 5ec59a81b2fda2fb65d4075e0672930b769f41d2
Parents: dbdaf84
Author: Lee moon soo <mo...@apache.org>
Authored: Sat Jan 16 11:04:09 2016 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Jan 20 15:39:21 2016 -0800

----------------------------------------------------------------------
 .../zeppelin/flink/FlinkInterpreterTest.java    |   2 +-
 .../zeppelin/hive/HiveInterpreterTest.java      |  12 +-
 .../zeppelin/ignite/IgniteInterpreterTest.java  |   2 +-
 .../ignite/IgniteSqlInterpreterTest.java        |   2 +-
 .../scalding/ScaldingInterpreterTest.java       |   2 +-
 .../zeppelin/spark/PySparkInterpreter.java      |  36 ++-
 .../apache/zeppelin/spark/SparkInterpreter.java |  32 +--
 .../zeppelin/spark/SparkOutputStream.java       |  75 ++++++
 .../apache/zeppelin/spark/ZeppelinContext.java  |  18 +-
 .../main/resources/python/zeppelin_pyspark.py   |   7 +-
 .../zeppelin/spark/DepInterpreterTest.java      |   2 +-
 .../zeppelin/spark/SparkInterpreterTest.java    |  23 +-
 .../zeppelin/spark/SparkSqlInterpreterTest.java |  17 +-
 .../interpreter/InterpreterContext.java         |   5 +-
 .../zeppelin/interpreter/InterpreterOutput.java | 249 +++++++++++++++++++
 .../InterpreterOutputChangeListener.java        |  27 ++
 .../InterpreterOutputChangeWatcher.java         | 140 +++++++++++
 .../interpreter/InterpreterOutputListener.java  |  34 +++
 .../zeppelin/interpreter/InterpreterResult.java |   4 +
 .../interpreter/remote/RemoteInterpreter.java   |  27 +-
 .../remote/RemoteInterpreterEventPoller.java    |  26 +-
 .../remote/RemoteInterpreterProcess.java        |   9 +-
 .../RemoteInterpreterProcessListener.java       |  25 ++
 .../remote/RemoteInterpreterServer.java         |  68 ++++-
 .../thrift/RemoteInterpreterContext.java        |   2 +-
 .../thrift/RemoteInterpreterEvent.java          |   2 +-
 .../thrift/RemoteInterpreterEventType.java      |   8 +-
 .../thrift/RemoteInterpreterResult.java         |   2 +-
 .../thrift/RemoteInterpreterService.java        |   2 +-
 .../main/thrift/RemoteInterpreterService.thrift |   4 +-
 .../interpreter/InterpreterContextTest.java     |   2 +-
 .../InterpreterOutputChangeWatcherTest.java     | 109 ++++++++
 .../interpreter/InterpreterOutputTest.java      | 127 ++++++++++
 .../interpreter/InterpreterResultTest.java      |   5 +
 .../remote/RemoteAngularObjectTest.java         |  15 +-
 .../RemoteInterpreterOutputTestStream.java      | 146 +++++++++++
 .../remote/RemoteInterpreterProcessTest.java    |   2 +-
 .../remote/RemoteInterpreterTest.java           | 174 +++++--------
 .../mock/MockInterpreterOutputStream.java       |  97 ++++++++
 .../zeppelin/scheduler/RemoteSchedulerTest.java |  32 +--
 .../apache/zeppelin/server/ZeppelinServer.java  |   3 +-
 .../org/apache/zeppelin/socket/Message.java     |   2 +
 .../apache/zeppelin/socket/NotebookServer.java  |  82 +++++-
 .../notebook/paragraph/paragraph-results.html   |  12 +-
 .../notebook/paragraph/paragraph.controller.js  |  65 ++++-
 .../websocketEvents/websocketEvents.factory.js  |   4 +
 .../interpreter/InterpreterFactory.java         |  13 +-
 .../zeppelin/notebook/JobListenerFactory.java   |   4 +-
 .../java/org/apache/zeppelin/notebook/Note.java |  77 ++++--
 .../org/apache/zeppelin/notebook/Paragraph.java |  55 +++-
 .../zeppelin/notebook/ParagraphJobListener.java |  29 +++
 .../interpreter/InterpreterFactoryTest.java     |   6 +-
 .../notebook/NoteInterpreterLoaderTest.java     |   2 +-
 .../apache/zeppelin/notebook/NotebookTest.java  |  18 +-
 .../notebook/repo/NotebookRepoSyncTest.java     |  20 +-
 .../notebook/repo/VFSNotebookRepoTest.java      |   9 +-
 56 files changed, 1672 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 3168f04..9a61be6 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -40,7 +40,7 @@ public class FlinkInterpreterTest {
     Properties p = new Properties();
     flink = new FlinkInterpreter(p);
     flink.open();
-    context = new InterpreterContext(null, null, null, null, null, null, null, null);
+    context = new InterpreterContext(null, null, null, null, null, null, null, null, null);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
index c22080d..c86fcf3 100644
--- a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
+++ b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
@@ -79,9 +79,9 @@ public class HiveInterpreterTest {
     HiveInterpreter t = new HiveInterpreter(properties);
     t.open();
 
-    assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message().contains("SCHEMA_NAME"));
+    assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME"));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
   }
 
   @Test
@@ -101,7 +101,7 @@ public class HiveInterpreterTest {
     t.open();
 
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
-        t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
+        t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
   }
 
   @Test
@@ -117,13 +117,13 @@ public class HiveInterpreterTest {
     t.open();
 
     InterpreterResult interpreterResult =
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
 
     t.getConnection("default").close();
 
     interpreterResult =
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
   }
 
@@ -139,7 +139,7 @@ public class HiveInterpreterTest {
     HiveInterpreter t = new HiveInterpreter(properties);
     t.open();
 
-    InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null);
+    InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null);
 
     //simple select test
     InterpreterResult result = t.interpret("select * from test_table", interpreterContext);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
index f46b049..cf98083 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
@@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
   private static final String HOST = "127.0.0.1:47500..47509";
 
   private static final InterpreterContext INTP_CONTEXT =
-          new InterpreterContext(null, null, null, null, null, null, null, null);
+          new InterpreterContext(null, null, null, null, null, null, null, null, null);
 
   private IgniteInterpreter intp;
   private Ignite ignite;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
index fb93ad5..a6dcc66 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
@@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
   private static final String HOST = "127.0.0.1:47500..47509";
 
   private static final InterpreterContext INTP_CONTEXT =
-          new InterpreterContext(null, null, null, null, null, null, null, null);
+          new InterpreterContext(null, null, null, null, null, null, null, null, null);
 
   private Ignite ignite;
   private IgniteSqlInterpreter intp;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index 7a753fa..606d4d9 100644
--- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -65,7 +65,7 @@ public class ScaldingInterpreterTest {
     context = new InterpreterContext("note", "id", "title", "text",
         new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
             intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+        new LinkedList<InterpreterContextRunner>(), null);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 8c4ba87..c5441ab 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -73,8 +73,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
   private GatewayServer gatewayServer;
   private DefaultExecutor executor;
   private int port;
-  private ByteArrayOutputStream outputStream;
-  private ByteArrayOutputStream errStream;
+  private SparkOutputStream outputStream;
   private BufferedWriter ins;
   private PipedInputStream in;
   private ByteArrayOutputStream input;
@@ -173,7 +172,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     cmd.addArgument(Integer.toString(port), false);
     cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
     executor = new DefaultExecutor();
-    outputStream = new ByteArrayOutputStream();
+    outputStream = new SparkOutputStream();
     PipedOutputStream ps = new PipedOutputStream();
     in = null;
     try {
@@ -274,7 +273,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       statementError = error;
       statementFinishedNotifier.notify();
     }
-
   }
 
   boolean pythonScriptInitialized = false;
@@ -287,6 +285,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     }
   }
 
+  public void appendOutput(String message) throws IOException {
+    outputStream.getInterpreterOutput().write(message);
+  }
+
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) {
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
@@ -300,7 +302,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
           + outputStream.toString());
     }
 
-    outputStream.reset();
+    outputStream.setInterpreterOutput(context.out);
 
     synchronized (pythonScriptInitializeNotifier) {
       long startTime = System.currentTimeMillis();
@@ -314,15 +316,24 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       }
     }
 
+    String errorMessage = "";
+    try {
+      context.out.flush();
+      errorMessage = new String(context.out.toByteArray());
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
+
+
     if (pythonscriptRunning == false) {
       // python script failed to initialize and terminated
       return new InterpreterResult(Code.ERROR, "failed to start pyspark"
-          + outputStream.toString());
+          + errorMessage);
     }
     if (pythonScriptInitialized == false) {
       // timeout. didn't get initialized message
       return new InterpreterResult(Code.ERROR, "pyspark is not responding "
-          + outputStream.toString());
+          + errorMessage);
     }
 
     if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
@@ -352,7 +363,14 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     if (statementError) {
       return new InterpreterResult(Code.ERROR, statementOutput);
     } else {
-      return new InterpreterResult(Code.SUCCESS, statementOutput);
+
+      try {
+        context.out.flush();
+      } catch (IOException e) {
+        throw new InterpreterException(e);
+      }
+
+      return new InterpreterResult(Code.SUCCESS);
     }
   }
 
@@ -389,8 +407,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       return new LinkedList<String>();
     }
 
-    outputStream.reset();
-
     pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "");
     statementOutput = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index d975791..7ee6d7c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -17,9 +17,7 @@
 
 package org.apache.zeppelin.spark;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -41,7 +39,6 @@ import org.apache.spark.repl.SparkJLineCompletion;
 import org.apache.spark.scheduler.ActiveJob;
 import org.apache.spark.scheduler.DAGScheduler;
 import org.apache.spark.scheduler.Pool;
-import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.ui.jobs.JobProgressListener;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -115,7 +112,7 @@ public class SparkInterpreter extends Interpreter {
   private SparkILoop interpreter;
   private SparkIMain intp;
   private SparkContext sc;
-  private ByteArrayOutputStream out;
+  private SparkOutputStream out;
   private SQLContext sqlc;
   private SparkDependencyResolver dep;
   private SparkJLineCompletion completor;
@@ -129,7 +126,7 @@ public class SparkInterpreter extends Interpreter {
 
   public SparkInterpreter(Properties property) {
     super(property);
-    out = new ByteArrayOutputStream();
+    out = new SparkOutputStream();
   }
 
   public SparkInterpreter(Properties property, SparkContext sc) {
@@ -452,10 +449,9 @@ public class SparkInterpreter extends Interpreter {
     b.v_$eq(true);
     settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
 
-    PrintStream printStream = new PrintStream(out);
-
     /* spark interpreter */
     this.interpreter = new SparkILoop(null, new PrintWriter(out));
+
     interpreter.settings_$eq(settings);
 
     interpreter.createInterpreter();
@@ -481,7 +477,7 @@ public class SparkInterpreter extends Interpreter {
 
     dep = getDependencyResolver();
 
-    z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
+    z = new ZeppelinContext(sc, sqlc, null, dep,
         Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
 
     intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
@@ -489,7 +485,6 @@ public class SparkInterpreter extends Interpreter {
     binder.put("sc", sc);
     binder.put("sqlc", sqlc);
     binder.put("z", z);
-    binder.put("out", printStream);
 
     intp.interpret("@transient val z = "
                  + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
@@ -675,13 +670,13 @@ public class SparkInterpreter extends Interpreter {
     synchronized (this) {
       z.setGui(context.getGui());
       sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
-      InterpreterResult r = interpretInput(lines);
+      InterpreterResult r = interpretInput(lines, context);
       sc.clearJobGroup();
       return r;
     }
   }
 
-  public InterpreterResult interpretInput(String[] lines) {
+  public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
     SparkEnv.set(env);
 
     // add print("") to make sure not finishing with comment
@@ -692,8 +687,9 @@ public class SparkInterpreter extends Interpreter {
     }
     linesToRun[lines.length] = "print(\"\")";
 
-    Console.setOut((java.io.PrintStream) binder.get("out"));
-    out.reset();
+    Console.setOut(context.out);
+    out.setInterpreterOutput(context.out);
+    context.out.clear();
     Code r = null;
     String incomplete = "";
 
@@ -713,6 +709,7 @@ public class SparkInterpreter extends Interpreter {
         res = intp.interpret(incomplete + s);
       } catch (Exception e) {
         sc.clearJobGroup();
+        out.setInterpreterOutput(null);
         logger.info("Interpreter exception", e);
         return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
       }
@@ -721,7 +718,8 @@ public class SparkInterpreter extends Interpreter {
 
       if (r == Code.ERROR) {
         sc.clearJobGroup();
-        return new InterpreterResult(r, out.toString());
+        out.setInterpreterOutput(null);
+        return new InterpreterResult(r, "");
       } else if (r == Code.INCOMPLETE) {
         incomplete += s + "\n";
       } else {
@@ -730,9 +728,13 @@ public class SparkInterpreter extends Interpreter {
     }
 
     if (r == Code.INCOMPLETE) {
+      sc.clearJobGroup();
+      out.setInterpreterOutput(null);
       return new InterpreterResult(r, "Incomplete expression");
     } else {
-      return new InterpreterResult(r, out.toString());
+      sc.clearJobGroup();
+      out.setInterpreterOutput(null);
+      return new InterpreterResult(Code.SUCCESS);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
new file mode 100644
index 0000000..98a4090
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * InterpreterOutput can be attached / detached.
+ */
+public class SparkOutputStream extends OutputStream {
+  InterpreterOutput interpreterOutput;
+
+  public SparkOutputStream() {
+  }
+
+  public InterpreterOutput getInterpreterOutput() {
+    return interpreterOutput;
+  }
+
+  public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
+    this.interpreterOutput = interpreterOutput;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.write(b);
+    }
+  }
+
+  @Override
+  public void write(byte [] b) throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.write(b);
+    }
+  }
+
+  @Override
+  public void write(byte [] b, int offset, int len) throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.write(b, offset, len);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.close();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (interpreterOutput != null) {
+      interpreterOutput.flush();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index a55ed73..6869161 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -21,6 +21,7 @@ import static scala.collection.JavaConversions.asJavaCollection;
 import static scala.collection.JavaConversions.asJavaIterable;
 import static scala.collection.JavaConversions.collectionAsScalaIterable;
 
+import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -54,19 +55,17 @@ import scala.collection.Iterable;
  */
 public class ZeppelinContext extends HashMap<String, Object> {
   private SparkDependencyResolver dep;
-  private PrintStream out;
   private InterpreterContext interpreterContext;
   private int maxResult;
 
   public ZeppelinContext(SparkContext sc, SQLContext sql,
       InterpreterContext interpreterContext,
-      SparkDependencyResolver dep, PrintStream printStream,
+      SparkDependencyResolver dep,
       int maxResult) {
     this.sc = sc;
     this.sqlContext = sql;
     this.interpreterContext = interpreterContext;
     this.dep = dep;
-    this.out = printStream;
     this.maxResult = maxResult;
   }
 
@@ -273,10 +272,15 @@ public class ZeppelinContext extends HashMap<String, Object> {
       throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
     }
 
-    if (cls.isInstance(o)) {
-      out.print(showDF(sc, interpreterContext, o, maxResult));
-    } else {
-      out.print(o.toString());
+
+    try {
+      if (cls.isInstance(o)) {
+        interpreterContext.out.write(showDF(sc, interpreterContext, o, maxResult));
+      } else {
+        interpreterContext.out.write(o.toString());
+      }
+    } catch (IOException e) {
+      throw new InterpreterException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 62f0a82..7da0f4e 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -36,10 +36,7 @@ class Logger(object):
     self.out = ""
 
   def write(self, message):
-    self.out = self.out + message
-
-  def get(self):
-    return self.out
+    intp.appendOutput(message)
 
   def reset(self):
     self.out = ""
@@ -224,7 +221,7 @@ while True :
       sc.setJobGroup(jobGroup, "Zeppelin")
       eval(compiledCode)
 
-    intp.setStatementsFinished(output.get(), False)
+    intp.setStatementsFinished("", False)
   except Py4JJavaError:
     excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
     innerErrorStart = excInnerError.find("Py4JJavaError:")

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index efa8fae..2b5613a 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -60,7 +60,7 @@ public class DepInterpreterTest {
 
     context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+        new LinkedList<InterpreterContextRunner>(), null);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index b629978..778966f 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -28,10 +28,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.junit.After;
 import org.junit.Before;
@@ -79,9 +76,21 @@ public class SparkInterpreterTest {
 
     InterpreterGroup intpGroup = new InterpreterGroup();
     context = new InterpreterContext("note", "id", "title", "text",
-        new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
-            intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LinkedList<InterpreterContextRunner>(),
+            new InterpreterOutput(new InterpreterOutputListener() {
+              @Override
+              public void onAppend(InterpreterOutput out, byte[] line) {
+
+              }
+
+              @Override
+              public void onUpdate(InterpreterOutput out, byte[] output) {
+
+              }
+            }));
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 4688cf8..731eab6 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -25,10 +25,7 @@ import java.util.Properties;
 
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import org.junit.After;
 import org.junit.Before;
@@ -69,7 +66,17 @@ public class SparkSqlInterpreterTest {
     }
     context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
+      @Override
+      public void onAppend(InterpreterOutput out, byte[] line) {
+
+      }
+
+      @Override
+      public void onUpdate(InterpreterOutput out, byte[] output) {
+
+      }
+    }));
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 0417f91..e3f6b59 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -29,6 +29,7 @@ import org.apache.zeppelin.display.GUI;
 public class InterpreterContext {
   private static final ThreadLocal<InterpreterContext> threadIC =
       new ThreadLocal<InterpreterContext>();
+  public final InterpreterOutput out;
 
   public static InterpreterContext get() {
     return threadIC.get();
@@ -58,7 +59,8 @@ public class InterpreterContext {
                             Map<String, Object> config,
                             GUI gui,
                             AngularObjectRegistry angularObjectRegistry,
-                            List<InterpreterContextRunner> runners
+                            List<InterpreterContextRunner> runners,
+                            InterpreterOutput out
                             ) {
     this.noteId = noteId;
     this.paragraphId = paragraphId;
@@ -68,6 +70,7 @@ public class InterpreterContext {
     this.gui = gui;
     this.angularObjectRegistry = angularObjectRegistry;
     this.runners = runners;
+    this.out = out;
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
new file mode 100644
index 0000000..42ebe48
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * InterpreterOutput is OutputStream that supposed to print content on notebook
+ * in addition to InterpreterResult which used to return from Interpreter.interpret().
+ */
+public class InterpreterOutput extends OutputStream {
+  Logger logger = LoggerFactory.getLogger(InterpreterOutput.class);
+  private final int NEW_LINE_CHAR = '\n';
+
+  ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+  private final List<Object> outList = new LinkedList<Object>();
+  private InterpreterOutputChangeWatcher watcher;
+  private final InterpreterOutputListener flushListener;
+  private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
+  private boolean firstWrite = true;
+
+  public InterpreterOutput(InterpreterOutputListener flushListener) {
+    this.flushListener = flushListener;
+    clear();
+  }
+
+  public InterpreterOutput(InterpreterOutputListener flushListener,
+                           InterpreterOutputChangeListener listener) throws IOException {
+    this.flushListener = flushListener;
+    clear();
+    watcher = new InterpreterOutputChangeWatcher(listener);
+    watcher.start();
+  }
+
+  public InterpreterResult.Type getType() {
+    return type;
+  }
+
+  public void setType(InterpreterResult.Type type) {
+    if (this.type != type) {
+      clear();
+      flushListener.onUpdate(this, new byte[]{});
+      this.type = type;
+    }
+  }
+
+  public void clear() {
+    synchronized (outList) {
+      type = InterpreterResult.Type.TEXT;
+      buffer.reset();
+      outList.clear();
+      if (watcher != null) {
+        watcher.clear();
+      }
+    }
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    synchronized (outList) {
+      buffer.write(b);
+      if (b == NEW_LINE_CHAR) {
+        // first time use of this outputstream.
+        if (firstWrite) {
+          // clear the output on gui
+          flushListener.onUpdate(this, new byte[]{});
+          firstWrite = false;
+        }
+
+        flush();
+      }
+    }
+  }
+
+  private byte [] detectTypeFromLine(byte [] byteArray) {
+    // check output type directive
+    String line = new String(byteArray);
+    for (InterpreterResult.Type t : InterpreterResult.Type.values()) {
+      String typeString = '%' + t.name().toLowerCase();
+      if ((typeString + "\n").equals(line)) {
+        setType(t);
+        byteArray = null;
+        break;
+      } else if (line.startsWith(typeString + " ")) {
+        setType(t);
+        byteArray = line.substring(typeString.length() + 1).getBytes();
+        break;
+      }
+    }
+
+    return byteArray;
+  }
+
+  @Override
+  public void write(byte [] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte [] b, int off, int len) throws IOException {
+    synchronized (outList) {
+      for (int i = off; i < len; i++) {
+        write(b[i]);
+      }
+    }
+  }
+
+  /**
+   * In dev mode, it monitors file and update ZeppelinServer
+   * @param file
+   * @throws IOException
+   */
+  public void write(File file) throws IOException {
+    outList.add(file);
+    if (watcher != null) {
+      watcher.watch(file);
+    }
+  }
+
+  public void write(String string) throws IOException {
+    write(string.getBytes());
+  }
+
+  /**
+   * write contents in the resource file in the classpath
+   * @param url
+   * @throws IOException
+   */
+  public void write(URL url) throws IOException {
+    if ("file".equals(url.getProtocol())) {
+      write(new File(url.getPath()));
+    } else {
+      outList.add(url);
+    }
+  }
+
+  public void writeResource(String resourceName) throws IOException {
+    // search file under resource dir first for dev mode
+    File mainResource = new File("./src/main/resources/" + resourceName);
+    File testResource = new File("./src/test/resources/" + resourceName);
+    if (mainResource.isFile()) {
+      write(mainResource);
+    } else if (testResource.isFile()) {
+      write(testResource);
+    } else {
+      // search from classpath
+      ClassLoader cl = Thread.currentThread().getContextClassLoader();
+      if (cl == null) {
+        cl = this.getClass().getClassLoader();
+      }
+      if (cl == null) {
+        cl = ClassLoader.getSystemClassLoader();
+      }
+
+      write(cl.getResource(resourceName));
+    }
+  }
+
+  public byte[] toByteArray() throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    List<Object> all = new LinkedList<Object>();
+
+    synchronized (outList) {
+      all.addAll(outList);
+    }
+
+    for (Object o : all) {
+      if (o instanceof File) {
+        File f = (File) o;
+        FileInputStream fin = new FileInputStream(f);
+        copyStream(fin, out);
+        fin.close();
+      } else if (o instanceof byte[]) {
+        out.write((byte[]) o);
+      } else if (o instanceof Integer) {
+        out.write((int) o);
+      } else if (o instanceof URL) {
+        InputStream fin = ((URL) o).openStream();
+        copyStream(fin, out);
+        fin.close();
+      } else {
+        // can not handle the object
+      }
+    }
+    out.close();
+    return out.toByteArray();
+  }
+
+  public void flush() throws IOException {
+    synchronized (outList) {
+      buffer.flush();
+      byte[] bytes = buffer.toByteArray();
+      bytes = detectTypeFromLine(bytes);
+      if (bytes != null) {
+        outList.add(bytes);
+        if (type == InterpreterResult.Type.TEXT) {
+          flushListener.onAppend(this, bytes);
+        }
+      }
+      buffer.reset();
+    }
+  }
+
+  private void copyStream(InputStream in, OutputStream out) throws IOException {
+    int bufferSize = 8192;
+    byte[] buffer = new byte[bufferSize];
+
+    while (true) {
+      int bytesRead = in.read(buffer);
+      if (bytesRead == -1) {
+        break;
+      } else {
+        out.write(buffer, 0, bytesRead);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+
+    if (watcher != null) {
+      watcher.clear();
+      watcher.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
new file mode 100644
index 0000000..a639e0c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import java.io.File;
+
+/**
+ * InterpreterOutputChangeListener
+ */
+public interface InterpreterOutputChangeListener {
+  public void fileChanged(File file);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
new file mode 100644
index 0000000..5fe8237
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Watch the change for the development mode support
+ */
+public class InterpreterOutputChangeWatcher extends Thread {
+  Logger logger = LoggerFactory.getLogger(InterpreterOutputChangeWatcher.class);
+
+  private WatchService watcher;
+  private final List<File> watchFiles = new LinkedList<File>();
+  private final Map<WatchKey, File> watchKeys = new HashMap<WatchKey, File>();
+  private InterpreterOutputChangeListener listener;
+  private boolean stop;
+
+  public InterpreterOutputChangeWatcher(InterpreterOutputChangeListener listener)
+      throws IOException {
+    watcher = FileSystems.getDefault().newWatchService();
+    this.listener = listener;
+  }
+
+  public void watch(File file) throws IOException {
+    String dirString;
+    if (file.isFile()) {
+      dirString = file.getParentFile().getAbsolutePath();
+    } else {
+      throw new IOException(file.getName() + " is not a file");
+    }
+
+    if (dirString == null) {
+      dirString = "/";
+    }
+
+    Path dir = FileSystems.getDefault().getPath(dirString);
+    logger.info("watch " + dir);
+    WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
+    synchronized (watchKeys) {
+      watchKeys.put(key, new File(dirString));
+      watchFiles.add(file);
+    }
+  }
+
+  public void clear() {
+    synchronized (watchKeys) {
+      for (WatchKey key : watchKeys.keySet()) {
+        key.cancel();
+
+      }
+      watchKeys.clear();
+      watchFiles.clear();
+    }
+  }
+
+  public void shutdown() throws IOException {
+    stop = true;
+    clear();
+    watcher.close();
+  }
+
+  public void run() {
+    while (!stop) {
+      WatchKey key = null;
+      try {
+        key = watcher.poll(1, TimeUnit.SECONDS);
+      } catch (InterruptedException | ClosedWatchServiceException e) {
+        break;
+      }
+
+      if (key == null) {
+        continue;
+      }
+      for (WatchEvent<?> event : key.pollEvents()) {
+        WatchEvent.Kind<?> kind = event.kind();
+        if (kind == OVERFLOW) {
+          continue;
+        }
+        WatchEvent<Path> ev = (WatchEvent<Path>) event;
+        Path filename = ev.context();
+        // search for filename
+        synchronized (watchKeys) {
+          for (File f : watchFiles) {
+            if (f.getName().compareTo(filename.toString()) == 0) {
+              File changedFile;
+              if (filename.isAbsolute()) {
+                changedFile = new File(filename.toString());
+              } else {
+                changedFile = new File(watchKeys.get(key), filename.toString());
+              }
+              logger.info("File change detected " + changedFile.getAbsolutePath());
+              if (listener != null) {
+                listener.fileChanged(changedFile);
+              }
+            }
+          }
+        }
+      }
+
+      boolean valid = key.reset();
+      if (!valid) {
+        break;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
new file mode 100644
index 0000000..bdb262a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+/**
+ * Listen InterpreterOutput buffer flush
+ */
+public interface InterpreterOutputListener {
+  /**
+   * called when newline is detected
+   * @param line
+   */
+  public void onAppend(InterpreterOutput out, byte[] line);
+
+  /**
+   * when entire output is updated. eg) after detecting new display system
+   * @param output
+   */
+  public void onUpdate(InterpreterOutput out, byte[] output);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 593cfc7..d213796 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -146,4 +146,8 @@ public class InterpreterResult implements Serializable {
     this.type = type;
     return this;
   }
+
+  public String toString() {
+    return "%" + type.name().toLowerCase() + " " + msg;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 455156c..d2a24e8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -48,6 +48,7 @@ import com.google.gson.reflect.TypeToken;
  *
  */
 public class RemoteInterpreter extends Interpreter {
+  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
   Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
   Gson gson = new Gson();
   private String interpreterRunner;
@@ -60,32 +61,35 @@ public class RemoteInterpreter extends Interpreter {
   private int connectTimeout;
 
   public RemoteInterpreter(Properties property,
-      String className,
-      String interpreterRunner,
-      String interpreterPath,
-      int connectTimeout) {
+                           String className,
+                           String interpreterRunner,
+                           String interpreterPath,
+                           int connectTimeout,
+                           RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
     super(property);
-
     this.className = className;
     initialized = false;
     this.interpreterRunner = interpreterRunner;
     this.interpreterPath = interpreterPath;
     env = new HashMap<String, String>();
     this.connectTimeout = connectTimeout;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
   }
 
   public RemoteInterpreter(Properties property,
-      String className,
-      String interpreterRunner,
-      String interpreterPath,
-      Map<String, String> env,
-      int connectTimeout) {
+                           String className,
+                           String interpreterRunner,
+                           String interpreterPath,
+                           Map<String, String> env,
+                           int connectTimeout,
+                           RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
     super(property);
     this.className = className;
     this.interpreterRunner = interpreterRunner;
     this.interpreterPath = interpreterPath;
     this.env = env;
     this.connectTimeout = connectTimeout;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
   }
 
   @Override
@@ -103,7 +107,8 @@ public class RemoteInterpreter extends Interpreter {
       if (intpGroup.getRemoteInterpreterProcess() == null) {
         // create new remote process
         RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
-                interpreterRunner, interpreterPath, env, connectTimeout);
+                interpreterRunner, interpreterPath, env, connectTimeout,
+                remoteInterpreterProcessListener);
 
         intpGroup.setRemoteInterpreterProcess(remoteProcess);
       }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index c39e0fe..6186205 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -18,29 +18,35 @@
 package org.apache.zeppelin.interpreter.remote;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.thrift.TException;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 /**
  *
  */
 public class RemoteInterpreterEventPoller extends Thread {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+  private final RemoteInterpreterProcessListener listener;
 
   private volatile boolean shutdown;
 
   private RemoteInterpreterProcess interpreterProcess;
   private InterpreterGroup interpreterGroup;
 
-  public RemoteInterpreterEventPoller() {
+  public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) {
+    this.listener = listener;
     shutdown = false;
   }
 
@@ -110,6 +116,24 @@ public class RemoteInterpreterEventPoller extends Thread {
 
           interpreterProcess.getInterpreterContextRunnerPool().run(
               runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
+          // on output append
+          Map<String, String> outputAppend = gson.fromJson(
+                  event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+          String noteId = outputAppend.get("noteId");
+          String paragraphId = outputAppend.get("paragraphId");
+          String outputToAppend = outputAppend.get("data");
+
+          listener.onOutputAppend(noteId, paragraphId, outputToAppend);
+        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
+          // on output update
+          Map<String, String> outputAppend = gson.fromJson(
+                  event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+          String noteId = outputAppend.get("noteId");
+          String paragraphId = outputAppend.get("paragraphId");
+          String outputToUpdate = outputAppend.get("data");
+
+          listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
         }
         logger.debug("Event from remoteproceess {}", event.getType());
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 2c195dc..56b5485 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -53,10 +53,11 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
   private int connectTimeout;
 
   public RemoteInterpreterProcess(String intpRunner,
-      String intpDir,
-      Map<String, String> env,
-      int connectTimeout) {
-    this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(), connectTimeout);
+                                  String intpDir,
+                                  Map<String, String> env,
+                                  int connectTimeout,
+                                  RemoteInterpreterProcessListener listener) {
+    this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(listener), connectTimeout);
   }
 
   RemoteInterpreterProcess(String intpRunner,

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
new file mode 100644
index 0000000..da6ac63
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+/**
+ * Event from remoteInterpreterProcess
+ */
+public interface RemoteInterpreterProcessListener {
+  public void onOutputAppend(String noteId, String paragraphId, String output);
+  public void onOutputUpdated(String noteId, String paragraphId, String output);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index a8da8c0..728d210 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -35,15 +35,8 @@ import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
@@ -300,7 +293,26 @@ public class RemoteInterpreterServer
       try {
         InterpreterContext.set(context);
         InterpreterResult result = interpreter.interpret(script, context);
-        return result;
+
+        // data from context.out is prepended to InterpreterResult if both defined
+        String message = "";
+
+        context.out.flush();
+        InterpreterResult.Type outputType = context.out.getType();
+        byte[] interpreterOutput = context.out.toByteArray();
+        context.out.clear();
+
+        if (interpreterOutput != null && interpreterOutput.length > 0) {
+          message = new String(interpreterOutput);
+        }
+
+        String interpreterResultMessage = result.message();
+        if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
+          message += interpreterResultMessage;
+          return new InterpreterResult(result.code(), result.type(), message);
+        } else {
+          return new InterpreterResult(result.code(), outputType, message);
+        }
       } finally {
         InterpreterContext.remove();
       }
@@ -351,7 +363,8 @@ public class RemoteInterpreterServer
   private InterpreterContext convert(RemoteInterpreterContext ric) {
     List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
     List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
-        new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType());
+            new TypeToken<List<RemoteInterpreterContextRunner>>() {
+        }.getType());
 
     for (InterpreterContextRunner r : runners) {
       contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
@@ -366,7 +379,40 @@ public class RemoteInterpreterServer
             new TypeToken<Map<String, Object>>() {}.getType()),
         gson.fromJson(ric.getGui(), GUI.class),
         interpreterGroup.getAngularObjectRegistry(),
-        contextRunners);
+        contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
+  }
+
+
+  private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) {
+    return new InterpreterOutput(new InterpreterOutputListener() {
+      @Override
+      public void onAppend(InterpreterOutput out, byte[] line) {
+        Map<String, String> appendOutput = new HashMap<String, String>();
+        appendOutput.put("noteId", noteId);
+        appendOutput.put("paragraphId", paragraphId);
+        appendOutput.put("data", new String(line));
+
+        Gson gson = new Gson();
+
+        sendEvent(new RemoteInterpreterEvent(
+                RemoteInterpreterEventType.OUTPUT_APPEND,
+                gson.toJson(appendOutput)));
+      }
+
+      @Override
+      public void onUpdate(InterpreterOutput out, byte[] output) {
+        Map<String, String> appendOutput = new HashMap<String, String>();
+        appendOutput.put("noteId", noteId);
+        appendOutput.put("paragraphId", paragraphId);
+        appendOutput.put("data", new String(output));
+
+        Gson gson = new Gson();
+
+        sendEvent(new RemoteInterpreterEvent(
+                RemoteInterpreterEventType.OUTPUT_UPDATE,
+                gson.toJson(appendOutput)));
+      }
+    });
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index a55d5de..175f482 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
 public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
index 96a49b5..79203fb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
 public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
index 9a7d142..d650318 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -33,7 +33,9 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
   ANGULAR_OBJECT_ADD(2),
   ANGULAR_OBJECT_UPDATE(3),
   ANGULAR_OBJECT_REMOVE(4),
-  RUN_INTERPRETER_CONTEXT_RUNNER(5);
+  RUN_INTERPRETER_CONTEXT_RUNNER(5),
+  OUTPUT_APPEND(6),
+  OUTPUT_UPDATE(7);
 
   private final int value;
 
@@ -64,6 +66,10 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
         return ANGULAR_OBJECT_REMOVE;
       case 5:
         return RUN_INTERPRETER_CONTEXT_RUNNER;
+      case 6:
+        return OUTPUT_APPEND;
+      case 7:
+        return OUTPUT_UPDATE;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
index 36c0f25..cc50f9c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
 public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index 6e6730e..738b453 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
 public class RemoteInterpreterService {
 
   public interface Iface {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 144784c..65fd0a7 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -42,7 +42,9 @@ enum RemoteInterpreterEventType {
   ANGULAR_OBJECT_ADD = 2,
   ANGULAR_OBJECT_UPDATE = 3,
   ANGULAR_OBJECT_REMOVE = 4,
-  RUN_INTERPRETER_CONTEXT_RUNNER = 5
+  RUN_INTERPRETER_CONTEXT_RUNNER = 5,
+  OUTPUT_APPEND = 6,
+  OUTPUT_UPDATE = 7
 }
 
 struct RemoteInterpreterEvent {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
index 080bdaa..9c2732d 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
@@ -27,7 +27,7 @@ public class InterpreterContextTest {
   public void testThreadLocal() {
     assertNull(InterpreterContext.get());
 
-    InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null));
+    InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null));
     assertNotNull(InterpreterContext.get());
 
     InterpreterContext.remove();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
new file mode 100644
index 0000000..e376809
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener {
+  private File tmpDir;
+  private File fileChanged;
+  private int numChanged;
+  private InterpreterOutputChangeWatcher watcher;
+
+  @Before
+  public void setUp() throws Exception {
+    watcher = new InterpreterOutputChangeWatcher(this);
+    watcher.start();
+
+    tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+    tmpDir.mkdirs();
+    fileChanged = null;
+    numChanged = 0;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    watcher.shutdown();
+    delete(tmpDir);
+  }
+
+  private void delete(File file){
+    if(file.isFile()) file.delete();
+    else if(file.isDirectory()){
+      File [] files = file.listFiles();
+      if(files!=null && files.length>0){
+        for(File f : files){
+          delete(f);
+        }
+      }
+      file.delete();
+    }
+  }
+
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    assertNull(fileChanged);
+    assertEquals(0, numChanged);
+
+    Thread.sleep(1000);
+    // create new file
+    File file1 = new File(tmpDir, "test1");
+    file1.createNewFile();
+
+    File file2 = new File(tmpDir, "test2");
+    file2.createNewFile();
+
+    watcher.watch(file1);
+    Thread.sleep(1000);
+
+    FileOutputStream out1 = new FileOutputStream(file1);
+    out1.write(1);
+    out1.close();
+
+    FileOutputStream out2 = new FileOutputStream(file2);
+    out2.write(1);
+    out2.close();
+
+    synchronized (this) {
+      wait(30*1000);
+    }
+
+    assertNotNull(fileChanged);
+    assertEquals(1, numChanged);
+  }
+
+
+  @Override
+  public void fileChanged(File file) {
+    fileChanged = file;
+    numChanged++;
+
+    synchronized(this) {
+      notify();
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
new file mode 100644
index 0000000..f8f4809
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class InterpreterOutputTest implements InterpreterOutputListener {
+  private InterpreterOutput out;
+  int numAppendEvent;
+  int numUpdateEvent;
+
+  @Before
+  public void setUp() {
+    out = new InterpreterOutput(this);
+    numAppendEvent = 0;
+    numUpdateEvent = 0;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    out.close();
+  }
+
+  @Test
+  public void testDetectNewline() throws IOException {
+    out.write("hello\nworld");
+    assertEquals("hello\n", new String(out.toByteArray()));
+    assertEquals(1, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+
+    out.write("\n");
+    assertEquals("hello\nworld\n", new String(out.toByteArray()));
+    assertEquals(2, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+  }
+
+  @Test
+  public void testFlush() throws IOException {
+    out.write("hello\nworld");
+    assertEquals("hello\n", new String(out.toByteArray()));
+    assertEquals(1, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+
+    out.flush();
+    assertEquals("hello\nworld", new String(out.toByteArray()));
+    assertEquals(2, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+
+    out.clear();
+    out.write("%html div");
+    assertEquals("", new String(out.toByteArray()));
+    assertEquals(InterpreterResult.Type.TEXT, out.getType());
+
+    out.flush();
+    out.write("%html div");
+    assertEquals("div", new String(out.toByteArray()));
+    assertEquals(InterpreterResult.Type.HTML, out.getType());
+  }
+
+  @Test
+  public void testType() throws IOException {
+    // default output stream type is TEXT
+    out.write("Text\n");
+    assertEquals(InterpreterResult.Type.TEXT, out.getType());
+    assertEquals("Text\n", new String(out.toByteArray()));
+    assertEquals(1, numAppendEvent);
+    assertEquals(1, numUpdateEvent);
+
+    // change type
+    out.write("%html\n");
+    assertEquals(InterpreterResult.Type.HTML, out.getType());
+    assertEquals("", new String(out.toByteArray()));
+    assertEquals(1, numAppendEvent);
+    assertEquals(2, numUpdateEvent);
+
+    // none TEXT type output stream does not generate append event
+    out.write("<div>html</div>\n");
+    assertEquals(InterpreterResult.Type.HTML, out.getType());
+    assertEquals(1, numAppendEvent);
+    assertEquals(2, numUpdateEvent);
+    assertEquals("<div>html</div>\n", new String(out.toByteArray()));
+
+    // change type to text again
+    out.write("%text hello\n");
+    assertEquals(InterpreterResult.Type.TEXT, out.getType());
+    assertEquals(2, numAppendEvent);
+    assertEquals(3, numUpdateEvent);
+    assertEquals("hello\n", new String(out.toByteArray()));
+  }
+
+  @Test
+  public void testType2() throws IOException {
+    out.write("%html\nHello");
+    assertEquals(InterpreterResult.Type.HTML, out.getType());
+  }
+
+  @Override
+  public void onAppend(InterpreterOutput out, byte[] line) {
+    numAppendEvent++;
+  }
+
+  @Override
+  public void onUpdate(InterpreterOutput out, byte[] output) {
+    numUpdateEvent++;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
index 007730a..d7ab9e8 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
@@ -105,4 +105,9 @@ public class InterpreterResultTest {
 					"123\n", result.message());
 	  }
 
+	  @Test
+	  public void testToString() {
+			assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString());
+		}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
index 29a1fb1..906878d 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -64,12 +64,13 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
     Properties p = new Properties();
 
     intp = new RemoteInterpreter(
-        p,
-        MockInterpreterAngular.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env,
-        10 * 1000
+            p,
+            MockInterpreterAngular.class.getName(),
+            new File("../bin/interpreter.sh").getAbsolutePath(),
+            "fake",
+            env,
+            10 * 1000,
+            null
         );
 
     intpGroup.add(intp);
@@ -83,7 +84,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
         new HashMap<String, Object>(),
         new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
-        new LinkedList<InterpreterContextRunner>());
+        new LinkedList<InterpreterContextRunner>(), null);
 
     intp.open();
   }