You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/01/21 00:36:55 UTC
[2/2] incubator-zeppelin git commit: [ZEPPELIN-554] Streaming
interpreter output to front-end
[ZEPPELIN-554] Streaming interpreter output to front-end
### What is this PR for?
Output from interpreter is displayed after completion of paragraph execution.
It'll be useful if output can be streamed to front-end during execution.
Previous work #593 injects InterpreterOutput stream object to Interpreter.
This PR is based on #593 and stream the data from InterpreterOutput to front-end.
This implementation only streams output is %text. Other output type (%html, %angular, %table) is not streamed to the front end.
While this PR keeps backward compatibility, Interpreter who want to use this feature will need to modify code to write output into `InterpreterOutput` instead of return with `InterpreterResult`.
This PR includes modification of SparkInterpreter to use InterpreterOutput.
### What type of PR is it?
Feature
### Todos
### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-554
### How should this be tested?
Run such code using Spark interpreter
```
(1 to 10).foreach{ i=>
Thread.sleep(1000)
println("Hello " + i)
}
```
### Screenshots (if appropriate)
![stream_output](https://cloud.githubusercontent.com/assets/1540981/12188677/6df08900-b56c-11e5-95d9-e5f6fad91007.gif)
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: Lee moon soo <mo...@apache.org>
Closes #611 from Leemoonsoo/output_stream_frontend and squashes the following commits:
53e2bb4 [Lee moon soo] Not persist on every append
dedae0d [Lee moon soo] Remove debug lines
8251fb4 [Lee moon soo] Fix syntax and style
9c9c8fd [Lee moon soo] update test
18215a3 [Lee moon soo] fix style
f7e6a4d [Lee moon soo] Fix syntax error
d29cfbf [Lee moon soo] workaround jshint
07b3e1a [Lee moon soo] Handle clear output correctly
bc6262e [Lee moon soo] Make PysparkInterpreter stream output
6d9cc51 [Lee moon soo] Pass InterpreterOutput to SparkILoop
b68180e [Lee moon soo] Add InterpreterOutput on spark interpreter unitest
626ad48 [Lee moon soo] Add license header
846015b [Lee moon soo] Update scalding
37d6920 [Lee moon soo] Handle display system directive correctly
e278e84 [Lee moon soo] Clear output correctly
479b836 [Lee moon soo] Add test
c01df62 [Lee moon soo] Connect Spark interpreter Console.out to outputstream
8a1223f [Lee moon soo] Handle update output correctly
e7a9b37 [Lee moon soo] Delayed persist
2060c1e [Lee moon soo] Clear before render text
786c978 [Lee moon soo] update paragraph object after witing to outputstream
258ff38 [Lee moon soo] Barely working
6f607f7 [Lee moon soo] Add newline listener
89d9798 [Lee moon soo] Render text output line by line
a42e4ff [Lee moon soo] Update test
fb5e7b5 [Lee moon soo] Update test
0f60b54 [Lee moon soo] Update test
a07d7db [Lee moon soo] Implement InterpreterResult.toString
1f419b6 [Lee moon soo] Add InterpreterOutput
c91f498 [Lee moon soo] prepend interpreteroutputstream to interpreter result
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/5ec59a81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/5ec59a81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/5ec59a81
Branch: refs/heads/master
Commit: 5ec59a81b2fda2fb65d4075e0672930b769f41d2
Parents: dbdaf84
Author: Lee moon soo <mo...@apache.org>
Authored: Sat Jan 16 11:04:09 2016 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Jan 20 15:39:21 2016 -0800
----------------------------------------------------------------------
.../zeppelin/flink/FlinkInterpreterTest.java | 2 +-
.../zeppelin/hive/HiveInterpreterTest.java | 12 +-
.../zeppelin/ignite/IgniteInterpreterTest.java | 2 +-
.../ignite/IgniteSqlInterpreterTest.java | 2 +-
.../scalding/ScaldingInterpreterTest.java | 2 +-
.../zeppelin/spark/PySparkInterpreter.java | 36 ++-
.../apache/zeppelin/spark/SparkInterpreter.java | 32 +--
.../zeppelin/spark/SparkOutputStream.java | 75 ++++++
.../apache/zeppelin/spark/ZeppelinContext.java | 18 +-
.../main/resources/python/zeppelin_pyspark.py | 7 +-
.../zeppelin/spark/DepInterpreterTest.java | 2 +-
.../zeppelin/spark/SparkInterpreterTest.java | 23 +-
.../zeppelin/spark/SparkSqlInterpreterTest.java | 17 +-
.../interpreter/InterpreterContext.java | 5 +-
.../zeppelin/interpreter/InterpreterOutput.java | 249 +++++++++++++++++++
.../InterpreterOutputChangeListener.java | 27 ++
.../InterpreterOutputChangeWatcher.java | 140 +++++++++++
.../interpreter/InterpreterOutputListener.java | 34 +++
.../zeppelin/interpreter/InterpreterResult.java | 4 +
.../interpreter/remote/RemoteInterpreter.java | 27 +-
.../remote/RemoteInterpreterEventPoller.java | 26 +-
.../remote/RemoteInterpreterProcess.java | 9 +-
.../RemoteInterpreterProcessListener.java | 25 ++
.../remote/RemoteInterpreterServer.java | 68 ++++-
.../thrift/RemoteInterpreterContext.java | 2 +-
.../thrift/RemoteInterpreterEvent.java | 2 +-
.../thrift/RemoteInterpreterEventType.java | 8 +-
.../thrift/RemoteInterpreterResult.java | 2 +-
.../thrift/RemoteInterpreterService.java | 2 +-
.../main/thrift/RemoteInterpreterService.thrift | 4 +-
.../interpreter/InterpreterContextTest.java | 2 +-
.../InterpreterOutputChangeWatcherTest.java | 109 ++++++++
.../interpreter/InterpreterOutputTest.java | 127 ++++++++++
.../interpreter/InterpreterResultTest.java | 5 +
.../remote/RemoteAngularObjectTest.java | 15 +-
.../RemoteInterpreterOutputTestStream.java | 146 +++++++++++
.../remote/RemoteInterpreterProcessTest.java | 2 +-
.../remote/RemoteInterpreterTest.java | 174 +++++--------
.../mock/MockInterpreterOutputStream.java | 97 ++++++++
.../zeppelin/scheduler/RemoteSchedulerTest.java | 32 +--
.../apache/zeppelin/server/ZeppelinServer.java | 3 +-
.../org/apache/zeppelin/socket/Message.java | 2 +
.../apache/zeppelin/socket/NotebookServer.java | 82 +++++-
.../notebook/paragraph/paragraph-results.html | 12 +-
.../notebook/paragraph/paragraph.controller.js | 65 ++++-
.../websocketEvents/websocketEvents.factory.js | 4 +
.../interpreter/InterpreterFactory.java | 13 +-
.../zeppelin/notebook/JobListenerFactory.java | 4 +-
.../java/org/apache/zeppelin/notebook/Note.java | 77 ++++--
.../org/apache/zeppelin/notebook/Paragraph.java | 55 +++-
.../zeppelin/notebook/ParagraphJobListener.java | 29 +++
.../interpreter/InterpreterFactoryTest.java | 6 +-
.../notebook/NoteInterpreterLoaderTest.java | 2 +-
.../apache/zeppelin/notebook/NotebookTest.java | 18 +-
.../notebook/repo/NotebookRepoSyncTest.java | 20 +-
.../notebook/repo/VFSNotebookRepoTest.java | 9 +-
56 files changed, 1672 insertions(+), 302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 3168f04..9a61be6 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -40,7 +40,7 @@ public class FlinkInterpreterTest {
Properties p = new Properties();
flink = new FlinkInterpreter(p);
flink.open();
- context = new InterpreterContext(null, null, null, null, null, null, null, null);
+ context = new InterpreterContext(null, null, null, null, null, null, null, null, null);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
index c22080d..c86fcf3 100644
--- a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
+++ b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
@@ -79,9 +79,9 @@ public class HiveInterpreterTest {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();
- assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message().contains("SCHEMA_NAME"));
+ assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME"));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
- t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
+ t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
}
@Test
@@ -101,7 +101,7 @@ public class HiveInterpreterTest {
t.open();
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
- t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
+ t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
}
@Test
@@ -117,13 +117,13 @@ public class HiveInterpreterTest {
t.open();
InterpreterResult interpreterResult =
- t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
+ t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
t.getConnection("default").close();
interpreterResult =
- t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
+ t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
}
@@ -139,7 +139,7 @@ public class HiveInterpreterTest {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();
- InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null);
+ InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null);
//simple select test
InterpreterResult result = t.interpret("select * from test_table", interpreterContext);
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
index f46b049..cf98083 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
@@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";
private static final InterpreterContext INTP_CONTEXT =
- new InterpreterContext(null, null, null, null, null, null, null, null);
+ new InterpreterContext(null, null, null, null, null, null, null, null, null);
private IgniteInterpreter intp;
private Ignite ignite;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
index fb93ad5..a6dcc66 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
@@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";
private static final InterpreterContext INTP_CONTEXT =
- new InterpreterContext(null, null, null, null, null, null, null, null);
+ new InterpreterContext(null, null, null, null, null, null, null, null, null);
private Ignite ignite;
private IgniteSqlInterpreter intp;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index 7a753fa..606d4d9 100644
--- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -65,7 +65,7 @@ public class ScaldingInterpreterTest {
context = new InterpreterContext("note", "id", "title", "text",
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), null);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 8c4ba87..c5441ab 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -73,8 +73,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
private GatewayServer gatewayServer;
private DefaultExecutor executor;
private int port;
- private ByteArrayOutputStream outputStream;
- private ByteArrayOutputStream errStream;
+ private SparkOutputStream outputStream;
private BufferedWriter ins;
private PipedInputStream in;
private ByteArrayOutputStream input;
@@ -173,7 +172,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
cmd.addArgument(Integer.toString(port), false);
cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
executor = new DefaultExecutor();
- outputStream = new ByteArrayOutputStream();
+ outputStream = new SparkOutputStream();
PipedOutputStream ps = new PipedOutputStream();
in = null;
try {
@@ -274,7 +273,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
statementError = error;
statementFinishedNotifier.notify();
}
-
}
boolean pythonScriptInitialized = false;
@@ -287,6 +285,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
+ public void appendOutput(String message) throws IOException {
+ outputStream.getInterpreterOutput().write(message);
+ }
+
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
@@ -300,7 +302,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
+ outputStream.toString());
}
- outputStream.reset();
+ outputStream.setInterpreterOutput(context.out);
synchronized (pythonScriptInitializeNotifier) {
long startTime = System.currentTimeMillis();
@@ -314,15 +316,24 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
+ String errorMessage = "";
+ try {
+ context.out.flush();
+ errorMessage = new String(context.out.toByteArray());
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+
+
if (pythonscriptRunning == false) {
// python script failed to initialize and terminated
return new InterpreterResult(Code.ERROR, "failed to start pyspark"
- + outputStream.toString());
+ + errorMessage);
}
if (pythonScriptInitialized == false) {
// timeout. didn't get initialized message
return new InterpreterResult(Code.ERROR, "pyspark is not responding "
- + outputStream.toString());
+ + errorMessage);
}
if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
@@ -352,7 +363,14 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
if (statementError) {
return new InterpreterResult(Code.ERROR, statementOutput);
} else {
- return new InterpreterResult(Code.SUCCESS, statementOutput);
+
+ try {
+ context.out.flush();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+
+ return new InterpreterResult(Code.SUCCESS);
}
}
@@ -389,8 +407,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
return new LinkedList<String>();
}
- outputStream.reset();
-
pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "");
statementOutput = null;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index d975791..7ee6d7c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -17,9 +17,7 @@
package org.apache.zeppelin.spark;
-import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -41,7 +39,6 @@ import org.apache.spark.repl.SparkJLineCompletion;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
-import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
@@ -115,7 +112,7 @@ public class SparkInterpreter extends Interpreter {
private SparkILoop interpreter;
private SparkIMain intp;
private SparkContext sc;
- private ByteArrayOutputStream out;
+ private SparkOutputStream out;
private SQLContext sqlc;
private SparkDependencyResolver dep;
private SparkJLineCompletion completor;
@@ -129,7 +126,7 @@ public class SparkInterpreter extends Interpreter {
public SparkInterpreter(Properties property) {
super(property);
- out = new ByteArrayOutputStream();
+ out = new SparkOutputStream();
}
public SparkInterpreter(Properties property, SparkContext sc) {
@@ -452,10 +449,9 @@ public class SparkInterpreter extends Interpreter {
b.v_$eq(true);
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
- PrintStream printStream = new PrintStream(out);
-
/* spark interpreter */
this.interpreter = new SparkILoop(null, new PrintWriter(out));
+
interpreter.settings_$eq(settings);
interpreter.createInterpreter();
@@ -481,7 +477,7 @@ public class SparkInterpreter extends Interpreter {
dep = getDependencyResolver();
- z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
+ z = new ZeppelinContext(sc, sqlc, null, dep,
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
@@ -489,7 +485,6 @@ public class SparkInterpreter extends Interpreter {
binder.put("sc", sc);
binder.put("sqlc", sqlc);
binder.put("z", z);
- binder.put("out", printStream);
intp.interpret("@transient val z = "
+ "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
@@ -675,13 +670,13 @@ public class SparkInterpreter extends Interpreter {
synchronized (this) {
z.setGui(context.getGui());
sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
- InterpreterResult r = interpretInput(lines);
+ InterpreterResult r = interpretInput(lines, context);
sc.clearJobGroup();
return r;
}
}
- public InterpreterResult interpretInput(String[] lines) {
+ public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
SparkEnv.set(env);
// add print("") to make sure not finishing with comment
@@ -692,8 +687,9 @@ public class SparkInterpreter extends Interpreter {
}
linesToRun[lines.length] = "print(\"\")";
- Console.setOut((java.io.PrintStream) binder.get("out"));
- out.reset();
+ Console.setOut(context.out);
+ out.setInterpreterOutput(context.out);
+ context.out.clear();
Code r = null;
String incomplete = "";
@@ -713,6 +709,7 @@ public class SparkInterpreter extends Interpreter {
res = intp.interpret(incomplete + s);
} catch (Exception e) {
sc.clearJobGroup();
+ out.setInterpreterOutput(null);
logger.info("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
}
@@ -721,7 +718,8 @@ public class SparkInterpreter extends Interpreter {
if (r == Code.ERROR) {
sc.clearJobGroup();
- return new InterpreterResult(r, out.toString());
+ out.setInterpreterOutput(null);
+ return new InterpreterResult(r, "");
} else if (r == Code.INCOMPLETE) {
incomplete += s + "\n";
} else {
@@ -730,9 +728,13 @@ public class SparkInterpreter extends Interpreter {
}
if (r == Code.INCOMPLETE) {
+ sc.clearJobGroup();
+ out.setInterpreterOutput(null);
return new InterpreterResult(r, "Incomplete expression");
} else {
- return new InterpreterResult(r, out.toString());
+ sc.clearJobGroup();
+ out.setInterpreterOutput(null);
+ return new InterpreterResult(Code.SUCCESS);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
new file mode 100644
index 0000000..98a4090
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * InterpreterOutput can be attached / detached.
+ */
+public class SparkOutputStream extends OutputStream {
+ InterpreterOutput interpreterOutput;
+
+ public SparkOutputStream() {
+ }
+
+ public InterpreterOutput getInterpreterOutput() {
+ return interpreterOutput;
+ }
+
+ public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
+ this.interpreterOutput = interpreterOutput;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte [] b) throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte [] b, int offset, int len) throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.write(b, offset, len);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.close();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.flush();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index a55ed73..6869161 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -21,6 +21,7 @@ import static scala.collection.JavaConversions.asJavaCollection;
import static scala.collection.JavaConversions.asJavaIterable;
import static scala.collection.JavaConversions.collectionAsScalaIterable;
+import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -54,19 +55,17 @@ import scala.collection.Iterable;
*/
public class ZeppelinContext extends HashMap<String, Object> {
private SparkDependencyResolver dep;
- private PrintStream out;
private InterpreterContext interpreterContext;
private int maxResult;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
- SparkDependencyResolver dep, PrintStream printStream,
+ SparkDependencyResolver dep,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
- this.out = printStream;
this.maxResult = maxResult;
}
@@ -273,10 +272,15 @@ public class ZeppelinContext extends HashMap<String, Object> {
throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
}
- if (cls.isInstance(o)) {
- out.print(showDF(sc, interpreterContext, o, maxResult));
- } else {
- out.print(o.toString());
+
+ try {
+ if (cls.isInstance(o)) {
+ interpreterContext.out.write(showDF(sc, interpreterContext, o, maxResult));
+ } else {
+ interpreterContext.out.write(o.toString());
+ }
+ } catch (IOException e) {
+ throw new InterpreterException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 62f0a82..7da0f4e 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -36,10 +36,7 @@ class Logger(object):
self.out = ""
def write(self, message):
- self.out = self.out + message
-
- def get(self):
- return self.out
+ intp.appendOutput(message)
def reset(self):
self.out = ""
@@ -224,7 +221,7 @@ while True :
sc.setJobGroup(jobGroup, "Zeppelin")
eval(compiledCode)
- intp.setStatementsFinished(output.get(), False)
+ intp.setStatementsFinished("", False)
except Py4JJavaError:
excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
innerErrorStart = excInnerError.find("Py4JJavaError:")
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index efa8fae..2b5613a 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -60,7 +60,7 @@ public class DepInterpreterTest {
context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), null);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index b629978..778966f 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -28,10 +28,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.After;
import org.junit.Before;
@@ -79,9 +76,21 @@ public class SparkInterpreterTest {
InterpreterGroup intpGroup = new InterpreterGroup();
context = new InterpreterContext("note", "id", "title", "text",
- new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
- intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>(),
+ new InterpreterOutput(new InterpreterOutputListener() {
+ @Override
+ public void onAppend(InterpreterOutput out, byte[] line) {
+
+ }
+
+ @Override
+ public void onUpdate(InterpreterOutput out, byte[] output) {
+
+ }
+ }));
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 4688cf8..731eab6 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -25,10 +25,7 @@ import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.junit.After;
import org.junit.Before;
@@ -69,7 +66,17 @@ public class SparkSqlInterpreterTest {
}
context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
+ @Override
+ public void onAppend(InterpreterOutput out, byte[] line) {
+
+ }
+
+ @Override
+ public void onUpdate(InterpreterOutput out, byte[] output) {
+
+ }
+ }));
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 0417f91..e3f6b59 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -29,6 +29,7 @@ import org.apache.zeppelin.display.GUI;
public class InterpreterContext {
private static final ThreadLocal<InterpreterContext> threadIC =
new ThreadLocal<InterpreterContext>();
+ public final InterpreterOutput out;
public static InterpreterContext get() {
return threadIC.get();
@@ -58,7 +59,8 @@ public class InterpreterContext {
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry,
- List<InterpreterContextRunner> runners
+ List<InterpreterContextRunner> runners,
+ InterpreterOutput out
) {
this.noteId = noteId;
this.paragraphId = paragraphId;
@@ -68,6 +70,7 @@ public class InterpreterContext {
this.gui = gui;
this.angularObjectRegistry = angularObjectRegistry;
this.runners = runners;
+ this.out = out;
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
new file mode 100644
index 0000000..42ebe48
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * InterpreterOutput is OutputStream that supposed to print content on notebook
+ * in addition to InterpreterResult which used to return from Interpreter.interpret().
+ */
+public class InterpreterOutput extends OutputStream {
+ Logger logger = LoggerFactory.getLogger(InterpreterOutput.class);
+ private final int NEW_LINE_CHAR = '\n';
+
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+ private final List<Object> outList = new LinkedList<Object>();
+ private InterpreterOutputChangeWatcher watcher;
+ private final InterpreterOutputListener flushListener;
+ private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
+ private boolean firstWrite = true;
+
+ public InterpreterOutput(InterpreterOutputListener flushListener) {
+ this.flushListener = flushListener;
+ clear();
+ }
+
+ public InterpreterOutput(InterpreterOutputListener flushListener,
+ InterpreterOutputChangeListener listener) throws IOException {
+ this.flushListener = flushListener;
+ clear();
+ watcher = new InterpreterOutputChangeWatcher(listener);
+ watcher.start();
+ }
+
+ public InterpreterResult.Type getType() {
+ return type;
+ }
+
+ public void setType(InterpreterResult.Type type) {
+ if (this.type != type) {
+ clear();
+ flushListener.onUpdate(this, new byte[]{});
+ this.type = type;
+ }
+ }
+
+ public void clear() {
+ synchronized (outList) {
+ type = InterpreterResult.Type.TEXT;
+ buffer.reset();
+ outList.clear();
+ if (watcher != null) {
+ watcher.clear();
+ }
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ synchronized (outList) {
+ buffer.write(b);
+ if (b == NEW_LINE_CHAR) {
+ // first time use of this outputstream.
+ if (firstWrite) {
+ // clear the output on gui
+ flushListener.onUpdate(this, new byte[]{});
+ firstWrite = false;
+ }
+
+ flush();
+ }
+ }
+ }
+
+ private byte [] detectTypeFromLine(byte [] byteArray) {
+ // check output type directive
+ String line = new String(byteArray);
+ for (InterpreterResult.Type t : InterpreterResult.Type.values()) {
+ String typeString = '%' + t.name().toLowerCase();
+ if ((typeString + "\n").equals(line)) {
+ setType(t);
+ byteArray = null;
+ break;
+ } else if (line.startsWith(typeString + " ")) {
+ setType(t);
+ byteArray = line.substring(typeString.length() + 1).getBytes();
+ break;
+ }
+ }
+
+ return byteArray;
+ }
+
+ @Override
+ public void write(byte [] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte [] b, int off, int len) throws IOException {
+ synchronized (outList) {
+ for (int i = off; i < len; i++) {
+ write(b[i]);
+ }
+ }
+ }
+
+ /**
+ * In dev mode, it monitors file and update ZeppelinServer
+ * @param file
+ * @throws IOException
+ */
+ public void write(File file) throws IOException {
+ outList.add(file);
+ if (watcher != null) {
+ watcher.watch(file);
+ }
+ }
+
+ public void write(String string) throws IOException {
+ write(string.getBytes());
+ }
+
+ /**
+ * write contents in the resource file in the classpath
+ * @param url
+ * @throws IOException
+ */
+ public void write(URL url) throws IOException {
+ if ("file".equals(url.getProtocol())) {
+ write(new File(url.getPath()));
+ } else {
+ outList.add(url);
+ }
+ }
+
+ public void writeResource(String resourceName) throws IOException {
+ // search file under resource dir first for dev mode
+ File mainResource = new File("./src/main/resources/" + resourceName);
+ File testResource = new File("./src/test/resources/" + resourceName);
+ if (mainResource.isFile()) {
+ write(mainResource);
+ } else if (testResource.isFile()) {
+ write(testResource);
+ } else {
+ // search from classpath
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ if (cl == null) {
+ cl = this.getClass().getClassLoader();
+ }
+ if (cl == null) {
+ cl = ClassLoader.getSystemClassLoader();
+ }
+
+ write(cl.getResource(resourceName));
+ }
+ }
+
+ public byte[] toByteArray() throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ List<Object> all = new LinkedList<Object>();
+
+ synchronized (outList) {
+ all.addAll(outList);
+ }
+
+ for (Object o : all) {
+ if (o instanceof File) {
+ File f = (File) o;
+ FileInputStream fin = new FileInputStream(f);
+ copyStream(fin, out);
+ fin.close();
+ } else if (o instanceof byte[]) {
+ out.write((byte[]) o);
+ } else if (o instanceof Integer) {
+ out.write((int) o);
+ } else if (o instanceof URL) {
+ InputStream fin = ((URL) o).openStream();
+ copyStream(fin, out);
+ fin.close();
+ } else {
+ // can not handle the object
+ }
+ }
+ out.close();
+ return out.toByteArray();
+ }
+
+ public void flush() throws IOException {
+ synchronized (outList) {
+ buffer.flush();
+ byte[] bytes = buffer.toByteArray();
+ bytes = detectTypeFromLine(bytes);
+ if (bytes != null) {
+ outList.add(bytes);
+ if (type == InterpreterResult.Type.TEXT) {
+ flushListener.onAppend(this, bytes);
+ }
+ }
+ buffer.reset();
+ }
+ }
+
+ private void copyStream(InputStream in, OutputStream out) throws IOException {
+ int bufferSize = 8192;
+ byte[] buffer = new byte[bufferSize];
+
+ while (true) {
+ int bytesRead = in.read(buffer);
+ if (bytesRead == -1) {
+ break;
+ } else {
+ out.write(buffer, 0, bytesRead);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+
+ if (watcher != null) {
+ watcher.clear();
+ watcher.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
new file mode 100644
index 0000000..a639e0c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import java.io.File;
+
+/**
+ * InterpreterOutputChangeListener
+ */
+public interface InterpreterOutputChangeListener {
+ public void fileChanged(File file);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
new file mode 100644
index 0000000..5fe8237
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Watch the change for the development mode support
+ */
+public class InterpreterOutputChangeWatcher extends Thread {
+ Logger logger = LoggerFactory.getLogger(InterpreterOutputChangeWatcher.class);
+
+ private WatchService watcher;
+ private final List<File> watchFiles = new LinkedList<File>();
+ private final Map<WatchKey, File> watchKeys = new HashMap<WatchKey, File>();
+ private InterpreterOutputChangeListener listener;
+ private boolean stop;
+
+ public InterpreterOutputChangeWatcher(InterpreterOutputChangeListener listener)
+ throws IOException {
+ watcher = FileSystems.getDefault().newWatchService();
+ this.listener = listener;
+ }
+
+ public void watch(File file) throws IOException {
+ String dirString;
+ if (file.isFile()) {
+ dirString = file.getParentFile().getAbsolutePath();
+ } else {
+ throw new IOException(file.getName() + " is not a file");
+ }
+
+ if (dirString == null) {
+ dirString = "/";
+ }
+
+ Path dir = FileSystems.getDefault().getPath(dirString);
+ logger.info("watch " + dir);
+ WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
+ synchronized (watchKeys) {
+ watchKeys.put(key, new File(dirString));
+ watchFiles.add(file);
+ }
+ }
+
+ public void clear() {
+ synchronized (watchKeys) {
+ for (WatchKey key : watchKeys.keySet()) {
+ key.cancel();
+
+ }
+ watchKeys.clear();
+ watchFiles.clear();
+ }
+ }
+
+ public void shutdown() throws IOException {
+ stop = true;
+ clear();
+ watcher.close();
+ }
+
+ public void run() {
+ while (!stop) {
+ WatchKey key = null;
+ try {
+ key = watcher.poll(1, TimeUnit.SECONDS);
+ } catch (InterruptedException | ClosedWatchServiceException e) {
+ break;
+ }
+
+ if (key == null) {
+ continue;
+ }
+ for (WatchEvent<?> event : key.pollEvents()) {
+ WatchEvent.Kind<?> kind = event.kind();
+ if (kind == OVERFLOW) {
+ continue;
+ }
+ WatchEvent<Path> ev = (WatchEvent<Path>) event;
+ Path filename = ev.context();
+ // search for filename
+ synchronized (watchKeys) {
+ for (File f : watchFiles) {
+ if (f.getName().compareTo(filename.toString()) == 0) {
+ File changedFile;
+ if (filename.isAbsolute()) {
+ changedFile = new File(filename.toString());
+ } else {
+ changedFile = new File(watchKeys.get(key), filename.toString());
+ }
+ logger.info("File change detected " + changedFile.getAbsolutePath());
+ if (listener != null) {
+ listener.fileChanged(changedFile);
+ }
+ }
+ }
+ }
+ }
+
+ boolean valid = key.reset();
+ if (!valid) {
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
new file mode 100644
index 0000000..bdb262a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+/**
+ * Listen InterpreterOutput buffer flush
+ */
+public interface InterpreterOutputListener {
+ /**
+ * called when newline is detected
+ * @param line
+ */
+ public void onAppend(InterpreterOutput out, byte[] line);
+
+ /**
+ * when entire output is updated. eg) after detecting new display system
+ * @param output
+ */
+ public void onUpdate(InterpreterOutput out, byte[] output);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 593cfc7..d213796 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -146,4 +146,8 @@ public class InterpreterResult implements Serializable {
this.type = type;
return this;
}
+
+ public String toString() {
+ return "%" + type.name().toLowerCase() + " " + msg;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 455156c..d2a24e8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -48,6 +48,7 @@ import com.google.gson.reflect.TypeToken;
*
*/
public class RemoteInterpreter extends Interpreter {
+ private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
Gson gson = new Gson();
private String interpreterRunner;
@@ -60,32 +61,35 @@ public class RemoteInterpreter extends Interpreter {
private int connectTimeout;
public RemoteInterpreter(Properties property,
- String className,
- String interpreterRunner,
- String interpreterPath,
- int connectTimeout) {
+ String className,
+ String interpreterRunner,
+ String interpreterPath,
+ int connectTimeout,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
-
this.className = className;
initialized = false;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
env = new HashMap<String, String>();
this.connectTimeout = connectTimeout;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
public RemoteInterpreter(Properties property,
- String className,
- String interpreterRunner,
- String interpreterPath,
- Map<String, String> env,
- int connectTimeout) {
+ String className,
+ String interpreterRunner,
+ String interpreterPath,
+ Map<String, String> env,
+ int connectTimeout,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.className = className;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.env = env;
this.connectTimeout = connectTimeout;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
@Override
@@ -103,7 +107,8 @@ public class RemoteInterpreter extends Interpreter {
if (intpGroup.getRemoteInterpreterProcess() == null) {
// create new remote process
RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
- interpreterRunner, interpreterPath, env, connectTimeout);
+ interpreterRunner, interpreterPath, env, connectTimeout,
+ remoteInterpreterProcessListener);
intpGroup.setRemoteInterpreterProcess(remoteProcess);
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index c39e0fe..6186205 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -18,29 +18,35 @@
package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
/**
*
*/
public class RemoteInterpreterEventPoller extends Thread {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+ private final RemoteInterpreterProcessListener listener;
private volatile boolean shutdown;
private RemoteInterpreterProcess interpreterProcess;
private InterpreterGroup interpreterGroup;
- public RemoteInterpreterEventPoller() {
+ public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) {
+ this.listener = listener;
shutdown = false;
}
@@ -110,6 +116,24 @@ public class RemoteInterpreterEventPoller extends Thread {
interpreterProcess.getInterpreterContextRunnerPool().run(
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+ } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
+ // on output append
+ Map<String, String> outputAppend = gson.fromJson(
+ event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+ String noteId = outputAppend.get("noteId");
+ String paragraphId = outputAppend.get("paragraphId");
+ String outputToAppend = outputAppend.get("data");
+
+ listener.onOutputAppend(noteId, paragraphId, outputToAppend);
+ } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
+ // on output update
+ Map<String, String> outputAppend = gson.fromJson(
+ event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+ String noteId = outputAppend.get("noteId");
+ String paragraphId = outputAppend.get("paragraphId");
+ String outputToUpdate = outputAppend.get("data");
+
+ listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
}
logger.debug("Event from remoteproceess {}", event.getType());
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 2c195dc..56b5485 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -53,10 +53,11 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
private int connectTimeout;
public RemoteInterpreterProcess(String intpRunner,
- String intpDir,
- Map<String, String> env,
- int connectTimeout) {
- this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(), connectTimeout);
+ String intpDir,
+ Map<String, String> env,
+ int connectTimeout,
+ RemoteInterpreterProcessListener listener) {
+ this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(listener), connectTimeout);
}
RemoteInterpreterProcess(String intpRunner,
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
new file mode 100644
index 0000000..da6ac63
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+/**
+ * Event from remoteInterpreterProcess
+ */
+public interface RemoteInterpreterProcessListener {
+ public void onOutputAppend(String noteId, String paragraphId, String output);
+ public void onOutputUpdated(String noteId, String paragraphId, String output);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index a8da8c0..728d210 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -35,15 +35,8 @@ import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
@@ -300,7 +293,26 @@ public class RemoteInterpreterServer
try {
InterpreterContext.set(context);
InterpreterResult result = interpreter.interpret(script, context);
- return result;
+
+ // data from context.out is prepended to InterpreterResult if both defined
+ String message = "";
+
+ context.out.flush();
+ InterpreterResult.Type outputType = context.out.getType();
+ byte[] interpreterOutput = context.out.toByteArray();
+ context.out.clear();
+
+ if (interpreterOutput != null && interpreterOutput.length > 0) {
+ message = new String(interpreterOutput);
+ }
+
+ String interpreterResultMessage = result.message();
+ if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
+ message += interpreterResultMessage;
+ return new InterpreterResult(result.code(), result.type(), message);
+ } else {
+ return new InterpreterResult(result.code(), outputType, message);
+ }
} finally {
InterpreterContext.remove();
}
@@ -351,7 +363,8 @@ public class RemoteInterpreterServer
private InterpreterContext convert(RemoteInterpreterContext ric) {
List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
- new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType());
+ new TypeToken<List<RemoteInterpreterContextRunner>>() {
+ }.getType());
for (InterpreterContextRunner r : runners) {
contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
@@ -366,7 +379,40 @@ public class RemoteInterpreterServer
new TypeToken<Map<String, Object>>() {}.getType()),
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry(),
- contextRunners);
+ contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
+ }
+
+
+ private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) {
+ return new InterpreterOutput(new InterpreterOutputListener() {
+ @Override
+ public void onAppend(InterpreterOutput out, byte[] line) {
+ Map<String, String> appendOutput = new HashMap<String, String>();
+ appendOutput.put("noteId", noteId);
+ appendOutput.put("paragraphId", paragraphId);
+ appendOutput.put("data", new String(line));
+
+ Gson gson = new Gson();
+
+ sendEvent(new RemoteInterpreterEvent(
+ RemoteInterpreterEventType.OUTPUT_APPEND,
+ gson.toJson(appendOutput)));
+ }
+
+ @Override
+ public void onUpdate(InterpreterOutput out, byte[] output) {
+ Map<String, String> appendOutput = new HashMap<String, String>();
+ appendOutput.put("noteId", noteId);
+ appendOutput.put("paragraphId", paragraphId);
+ appendOutput.put("data", new String(output));
+
+ Gson gson = new Gson();
+
+ sendEvent(new RemoteInterpreterEvent(
+ RemoteInterpreterEventType.OUTPUT_UPDATE,
+ gson.toJson(appendOutput)));
+ }
+ });
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index a55d5de..175f482 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
index 96a49b5..79203fb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
index 9a7d142..d650318 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -33,7 +33,9 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
ANGULAR_OBJECT_ADD(2),
ANGULAR_OBJECT_UPDATE(3),
ANGULAR_OBJECT_REMOVE(4),
- RUN_INTERPRETER_CONTEXT_RUNNER(5);
+ RUN_INTERPRETER_CONTEXT_RUNNER(5),
+ OUTPUT_APPEND(6),
+ OUTPUT_UPDATE(7);
private final int value;
@@ -64,6 +66,10 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return ANGULAR_OBJECT_REMOVE;
case 5:
return RUN_INTERPRETER_CONTEXT_RUNNER;
+ case 6:
+ return OUTPUT_APPEND;
+ case 7:
+ return OUTPUT_UPDATE;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
index 36c0f25..cc50f9c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index 6e6730e..738b453 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterService {
public interface Iface {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 144784c..65fd0a7 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -42,7 +42,9 @@ enum RemoteInterpreterEventType {
ANGULAR_OBJECT_ADD = 2,
ANGULAR_OBJECT_UPDATE = 3,
ANGULAR_OBJECT_REMOVE = 4,
- RUN_INTERPRETER_CONTEXT_RUNNER = 5
+ RUN_INTERPRETER_CONTEXT_RUNNER = 5,
+ OUTPUT_APPEND = 6,
+ OUTPUT_UPDATE = 7
}
struct RemoteInterpreterEvent {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
index 080bdaa..9c2732d 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
@@ -27,7 +27,7 @@ public class InterpreterContextTest {
public void testThreadLocal() {
assertNull(InterpreterContext.get());
- InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null));
+ InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null));
assertNotNull(InterpreterContext.get());
InterpreterContext.remove();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
new file mode 100644
index 0000000..e376809
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener {
+ private File tmpDir;
+ private File fileChanged;
+ private int numChanged;
+ private InterpreterOutputChangeWatcher watcher;
+
+ @Before
+ public void setUp() throws Exception {
+ watcher = new InterpreterOutputChangeWatcher(this);
+ watcher.start();
+
+ tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+ tmpDir.mkdirs();
+ fileChanged = null;
+ numChanged = 0;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ watcher.shutdown();
+ delete(tmpDir);
+ }
+
+ private void delete(File file){
+ if(file.isFile()) file.delete();
+ else if(file.isDirectory()){
+ File [] files = file.listFiles();
+ if(files!=null && files.length>0){
+ for(File f : files){
+ delete(f);
+ }
+ }
+ file.delete();
+ }
+ }
+
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ assertNull(fileChanged);
+ assertEquals(0, numChanged);
+
+ Thread.sleep(1000);
+ // create new file
+ File file1 = new File(tmpDir, "test1");
+ file1.createNewFile();
+
+ File file2 = new File(tmpDir, "test2");
+ file2.createNewFile();
+
+ watcher.watch(file1);
+ Thread.sleep(1000);
+
+ FileOutputStream out1 = new FileOutputStream(file1);
+ out1.write(1);
+ out1.close();
+
+ FileOutputStream out2 = new FileOutputStream(file2);
+ out2.write(1);
+ out2.close();
+
+ synchronized (this) {
+ wait(30*1000);
+ }
+
+ assertNotNull(fileChanged);
+ assertEquals(1, numChanged);
+ }
+
+
+ @Override
+ public void fileChanged(File file) {
+ fileChanged = file;
+ numChanged++;
+
+ synchronized(this) {
+ notify();
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
new file mode 100644
index 0000000..f8f4809
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class InterpreterOutputTest implements InterpreterOutputListener {
+ private InterpreterOutput out;
+ int numAppendEvent;
+ int numUpdateEvent;
+
+ @Before
+ public void setUp() {
+ out = new InterpreterOutput(this);
+ numAppendEvent = 0;
+ numUpdateEvent = 0;
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ out.close();
+ }
+
+ @Test
+ public void testDetectNewline() throws IOException {
+ out.write("hello\nworld");
+ assertEquals("hello\n", new String(out.toByteArray()));
+ assertEquals(1, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+
+ out.write("\n");
+ assertEquals("hello\nworld\n", new String(out.toByteArray()));
+ assertEquals(2, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+ }
+
+ @Test
+ public void testFlush() throws IOException {
+ out.write("hello\nworld");
+ assertEquals("hello\n", new String(out.toByteArray()));
+ assertEquals(1, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+
+ out.flush();
+ assertEquals("hello\nworld", new String(out.toByteArray()));
+ assertEquals(2, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+
+ out.clear();
+ out.write("%html div");
+ assertEquals("", new String(out.toByteArray()));
+ assertEquals(InterpreterResult.Type.TEXT, out.getType());
+
+ out.flush();
+ out.write("%html div");
+ assertEquals("div", new String(out.toByteArray()));
+ assertEquals(InterpreterResult.Type.HTML, out.getType());
+ }
+
+ @Test
+ public void testType() throws IOException {
+ // default output stream type is TEXT
+ out.write("Text\n");
+ assertEquals(InterpreterResult.Type.TEXT, out.getType());
+ assertEquals("Text\n", new String(out.toByteArray()));
+ assertEquals(1, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+
+ // change type
+ out.write("%html\n");
+ assertEquals(InterpreterResult.Type.HTML, out.getType());
+ assertEquals("", new String(out.toByteArray()));
+ assertEquals(1, numAppendEvent);
+ assertEquals(2, numUpdateEvent);
+
+ // none TEXT type output stream does not generate append event
+ out.write("<div>html</div>\n");
+ assertEquals(InterpreterResult.Type.HTML, out.getType());
+ assertEquals(1, numAppendEvent);
+ assertEquals(2, numUpdateEvent);
+ assertEquals("<div>html</div>\n", new String(out.toByteArray()));
+
+ // change type to text again
+ out.write("%text hello\n");
+ assertEquals(InterpreterResult.Type.TEXT, out.getType());
+ assertEquals(2, numAppendEvent);
+ assertEquals(3, numUpdateEvent);
+ assertEquals("hello\n", new String(out.toByteArray()));
+ }
+
+ @Test
+ public void testType2() throws IOException {
+ out.write("%html\nHello");
+ assertEquals(InterpreterResult.Type.HTML, out.getType());
+ }
+
+ @Override
+ public void onAppend(InterpreterOutput out, byte[] line) {
+ numAppendEvent++;
+ }
+
+ @Override
+ public void onUpdate(InterpreterOutput out, byte[] output) {
+ numUpdateEvent++;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
index 007730a..d7ab9e8 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
@@ -105,4 +105,9 @@ public class InterpreterResultTest {
"123\n", result.message());
}
+ @Test
+ public void testToString() {
+ assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
index 29a1fb1..906878d 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -64,12 +64,13 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
Properties p = new Properties();
intp = new RemoteInterpreter(
- p,
- MockInterpreterAngular.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
+ p,
+ MockInterpreterAngular.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000,
+ null
);
intpGroup.add(intp);
@@ -83,7 +84,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), null);
intp.open();
}