You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/01/21 00:36:54 UTC
[1/2] incubator-zeppelin git commit: [ZEPPELIN-554] Streaming
interpreter output to front-end
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master dbdaf84e4 -> 5ec59a81b
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
new file mode 100644
index 0000000..623a037
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test for remote interpreter output stream
+ */
+public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
+ private InterpreterGroup intpGroup;
+ private HashMap<String, String> env;
+
+ @Before
+ public void setUp() throws Exception {
+ intpGroup = new InterpreterGroup();
+ env = new HashMap<String, String>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ intpGroup.close();
+ intpGroup.destroy();
+ }
+
+ private RemoteInterpreter createMockInterpreter() {
+ RemoteInterpreter intp = new RemoteInterpreter(
+ new Properties(),
+ MockInterpreterOutputStream.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000,
+ this);
+
+
+ intpGroup.add(intp);
+ intp.setInterpreterGroup(intpGroup);
+ return intp;
+ }
+
+ private InterpreterContext createInterpreterContext() {
+ return new InterpreterContext(
+ "noteId",
+ "id",
+ "title",
+ "text",
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>(), null);
+ }
+
+ @Test
+ public void testInterpreterResultOnly() {
+ RemoteInterpreter intp = createMockInterpreter();
+ InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("staticresult", ret.message());
+
+ ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("staticresult2", ret.message());
+
+ ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals("staticresult3", ret.message());
+ }
+
+ @Test
+ public void testInterpreterOutputStreamOnly() {
+ RemoteInterpreter intp = createMockInterpreter();
+ InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("streamresult", ret.message());
+
+ ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals("streamresult2", ret.message());
+ }
+
+ @Test
+ public void testInterpreterResultOutputStreamMixed() {
+ RemoteInterpreter intp = createMockInterpreter();
+ InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("streamstatic", ret.message());
+ }
+
+ @Test
+ public void testOutputType() {
+ RemoteInterpreter intp = createMockInterpreter();
+
+ InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.type());
+ assertEquals("hello", ret.message());
+
+ ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.type());
+ assertEquals("hello", ret.message());
+
+ ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.ANGULAR, ret.type());
+ assertEquals("helloworld", ret.message());
+ }
+
+ @Override
+ public void onOutputAppend(String noteId, String paragraphId, String output) {
+
+ }
+
+ @Override
+ public void onOutputUpdated(String noteId, String paragraphId, String output) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
index ea5397e..abee5b8 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -34,7 +34,7 @@ public class RemoteInterpreterProcessTest {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
"../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
- 10 * 1000);
+ 10 * 1000, null);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
assertEquals(1, rip.reference(intpGroup));
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index c938ff3..034a676 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -63,30 +63,38 @@ public class RemoteInterpreterTest {
intpGroup.destroy();
}
+ private RemoteInterpreter createMockInterpreterA(Properties p) {
+ return new RemoteInterpreter(
+ p,
+ MockInterpreterA.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000,
+ null);
+ }
+
+ private RemoteInterpreter createMockInterpreterB(Properties p) {
+ return new RemoteInterpreter(
+ p,
+ MockInterpreterB.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000,
+ null);
+ }
+
@Test
public void testRemoteInterperterCall() throws TTransportException, IOException {
Properties p = new Properties();
- RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
- RemoteInterpreter intpB = new RemoteInterpreter(
- p,
- MockInterpreterB.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ RemoteInterpreter intpB = createMockInterpreterB(p);
intpGroup.add(intpB);
intpB.setInterpreterGroup(intpGroup);
@@ -113,7 +121,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
intpB.open();
assertEquals(2, process.referenceCount());
@@ -131,14 +139,7 @@ public class RemoteInterpreterTest {
public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
Properties p = new Properties();
- RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
@@ -153,7 +154,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
assertEquals(Code.ERROR, ret.code());
}
@@ -163,24 +164,26 @@ public class RemoteInterpreterTest {
Properties p = new Properties();
RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
+ p,
+ MockInterpreterA.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000,
+ null
);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreter intpB = new RemoteInterpreter(
- p,
- MockInterpreterB.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
+ p,
+ MockInterpreterB.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000,
+ null
);
intpGroup.add(intpB);
@@ -199,7 +202,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
assertEquals("500", ret.message());
ret = intpB.interpret("500",
@@ -211,7 +214,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
assertEquals("1000", ret.message());
long end = System.currentTimeMillis();
assertTrue(end - start >= 1000);
@@ -225,26 +228,12 @@ public class RemoteInterpreterTest {
public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
Properties p = new Properties();
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
- final RemoteInterpreter intpB = new RemoteInterpreter(
- p,
- MockInterpreterB.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ final RemoteInterpreter intpB = createMockInterpreterB(p);
intpGroup.add(intpB);
intpB.setInterpreterGroup(intpGroup);
@@ -276,7 +265,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
}
@Override
@@ -310,7 +299,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
}
@Override
@@ -340,14 +329,7 @@ public class RemoteInterpreterTest {
public void testRunOrderPreserved() throws InterruptedException {
Properties p = new Properties();
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
@@ -382,7 +364,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
synchronized (results) {
results.add(ret.message());
@@ -421,14 +403,7 @@ public class RemoteInterpreterTest {
Properties p = new Properties();
p.put("parallel", "true");
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
@@ -466,7 +441,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
synchronized (results) {
results.add(ret.message());
@@ -501,14 +476,7 @@ public class RemoteInterpreterTest {
public void testInterpreterGroupResetBeforeProcessStarts() {
Properties p = new Properties();
- RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ RemoteInterpreter intpA = createMockInterpreterA(p);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
@@ -523,14 +491,7 @@ public class RemoteInterpreterTest {
public void testInterpreterGroupResetAfterProcessFinished() {
Properties p = new Properties();
- RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ RemoteInterpreter intpA = createMockInterpreterA(p);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
@@ -548,14 +509,7 @@ public class RemoteInterpreterTest {
public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException {
Properties p = new Properties();
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ final RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
@@ -585,7 +539,7 @@ public class RemoteInterpreterTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
}
@Override
@@ -616,26 +570,12 @@ public class RemoteInterpreterTest {
public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
Properties p = new Properties();
- RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ RemoteInterpreter intpA = createMockInterpreterA(p);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
- RemoteInterpreter intpB = new RemoteInterpreter(
- p,
- MockInterpreterB.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
- );
+ RemoteInterpreter intpB = createMockInterpreterB(p);
intpGroup.add(intpB);
intpB.setInterpreterGroup(intpGroup);
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
new file mode 100644
index 0000000..bc1859f
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MockInterpreter to test outputstream
+ */
+public class MockInterpreterOutputStream extends Interpreter {
+ static {
+ Interpreter.register(
+ "interpreterOutputStream",
+ "group1",
+ MockInterpreterA.class.getName(),
+ new InterpreterPropertyBuilder().build());
+
+ }
+
+ private String lastSt;
+
+ public MockInterpreterOutputStream(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ //new RuntimeException().printStackTrace();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public String getLastStatement() {
+ return lastSt;
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] ret = st.split(":");
+ try {
+ if (ret[1] != null) {
+ context.out.write(ret[1]);
+ }
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+ return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ?
+ ret[2] : "");
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<String> completion(String buf, int cursor) {
+ return null;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index d17df4f..05bc676 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -64,12 +64,13 @@ public class RemoteSchedulerTest {
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
+ p,
+ MockInterpreterA.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000,
+ null
);
intpGroup.add(intpA);
@@ -103,7 +104,7 @@ public class RemoteSchedulerTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>()));
+ new LinkedList<InterpreterContextRunner>(), null));
return "1000";
}
@@ -147,12 +148,13 @@ public class RemoteSchedulerTest {
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
+ p,
+ MockInterpreterA.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000,
+ null
);
intpGroup.add(intpA);
@@ -173,7 +175,7 @@ public class RemoteSchedulerTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), null);
@Override
public int progress() {
@@ -209,7 +211,7 @@ public class RemoteSchedulerTest {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), null);
@Override
public int progress() {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 9e7a97c..dff75c7 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -81,7 +81,8 @@ public class ZeppelinServer extends Application {
this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO));
this.schedulerFactory = new SchedulerFactory();
- this.replFactory = new InterpreterFactory(conf, notebookWsServer, depResolver);
+ this.replFactory = new InterpreterFactory(conf, notebookWsServer,
+ notebookWsServer, depResolver);
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new LuceneSearch();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
index 0142df2..4296e93 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
@@ -93,6 +93,8 @@ public class Message {
PARAGRAPH_REMOVE,
PARAGRAPH_CLEAR_OUTPUT,
+ PARAGRAPH_APPEND_OUTPUT, // [s-c] append output
+ PARAGRAPH_UPDATE_OUTPUT, // [s-c] update (replace) output
PING,
ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 3dfdca3..64698fc 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -30,12 +30,11 @@ import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@@ -57,7 +56,8 @@ import com.google.gson.Gson;
*
*/
public class NotebookServer extends WebSocketServlet implements
- NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener {
+ NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener,
+ RemoteInterpreterProcessListener {
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
Gson gson = new Gson();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
@@ -749,14 +749,46 @@ public class NotebookServer extends WebSocketServlet implements
}
/**
+ * This callback is for the paragraph that runs on ZeppelinServer
+ * @param noteId
+ * @param paragraphId
+ * @param output output to append
+ */
+ @Override
+ public void onOutputAppend(String noteId, String paragraphId, String output) {
+ Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
+ .put("noteId", noteId)
+ .put("paragraphId", paragraphId)
+ .put("data", output);
+ Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
+ broadcast(noteId, msg);
+ }
+
+ /**
+ * This callback is for the paragraph that runs on ZeppelinServer
+ * @param noteId
+ * @param paragraphId
+ * @param output output to update (replace)
+ */
+ @Override
+ public void onOutputUpdated(String noteId, String paragraphId, String output) {
+ Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
+ .put("noteId", noteId)
+ .put("paragraphId", paragraphId)
+ .put("data", output);
+ Paragraph paragraph = notebook().getNote(noteId).getParagraph(paragraphId);
+ broadcast(noteId, msg);
+ }
+
+ /**
* Need description here.
*
*/
- public static class ParagraphJobListener implements JobListener {
+ public static class ParagraphListenerImpl implements ParagraphJobListener {
private NotebookServer notebookServer;
private Note note;
- public ParagraphJobListener(NotebookServer notebookServer, Note note) {
+ public ParagraphListenerImpl(NotebookServer notebookServer, Note note) {
this.notebookServer = notebookServer;
this.note = note;
}
@@ -791,11 +823,43 @@ public class NotebookServer extends WebSocketServlet implements
}
notebookServer.broadcastNote(note);
}
+
+ /**
+ * This callback is for praragraph that runs on RemoteInterpreterProcess
+ * @param paragraph
+ * @param out
+ * @param output
+ */
+ @Override
+ public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
+ Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
+ .put("noteId", paragraph.getNote().getId())
+ .put("paragraphId", paragraph.getId())
+ .put("data", output);
+
+ notebookServer.broadcast(paragraph.getNote().getId(), msg);
+ }
+
+ /**
+ * This callback is for paragraph that runs on RemoteInterpreterProcess
+ * @param paragraph
+ * @param out
+ * @param output
+ */
+ @Override
+ public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
+ Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
+ .put("noteId", paragraph.getNote().getId())
+ .put("paragraphId", paragraph.getId())
+ .put("data", output);
+
+ notebookServer.broadcast(paragraph.getNote().getId(), msg);
+ }
}
@Override
- public JobListener getParagraphJobListener(Note note) {
- return new ParagraphJobListener(this, note);
+ public ParagraphJobListener getParagraphJobListener(Note note) {
+ return new ParagraphListenerImpl(this, note);
}
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
index 80af4ef..7fb40ac 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-results.html
@@ -23,24 +23,22 @@ limitations under the License.
ng-bind-html="paragraph.result.comment">
</div>
- <div id="{{paragraph.id}}_text"
+ <div id="p{{paragraph.id}}_text"
class="text"
- ng-if="paragraph.result.type == 'TEXT'"
- ng-bind="paragraph.result.msg">
- </div>
+ ng-if="getResultType() == 'TEXT'"></div>
<div id="p{{paragraph.id}}_html"
class="resultContained"
- ng-if="paragraph.result.type == 'HTML'">
+ ng-if="getResultType() == 'HTML'">
</div>
<div id="p{{paragraph.id}}_angular"
class="resultContained"
- ng-if="paragraph.result.type == 'ANGULAR'">
+ ng-if="getResultType() == 'ANGULAR'">
</div>
<img id="{{paragraph.id}}_img"
- ng-if="paragraph.result.type == 'IMG'"
+ ng-if="getResultType() == 'IMG'"
ng-src="{{getBase64ImageSrc(paragraph.result.msg)}}">
</img>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
index be88498..30c7ea4 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
@@ -54,11 +54,13 @@ angular.module('zeppelinWebApp')
$scope.renderHtml();
} else if ($scope.getResultType() === 'ANGULAR') {
$scope.renderAngular();
+ } else if ($scope.getResultType() === 'TEXT') {
+ $scope.renderText();
}
};
- $scope.renderHtml = function() {
- var retryRenderer = function() {
+ $scope.renderHtml = function() {
+ var retryRenderer = function() {
if (angular.element('#p' + $scope.paragraph.id + '_html').length) {
try {
angular.element('#p' + $scope.paragraph.id + '_html').html($scope.paragraph.result.msg);
@@ -93,6 +95,42 @@ angular.module('zeppelinWebApp')
$timeout(retryRenderer);
};
+ $scope.renderText = function() {
+ var retryRenderer = function() {
+
+ var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
+ if (textEl.length) {
+ // clear all lines before render
+ $scope.clearTextOutput();
+
+ if ($scope.paragraph.result && $scope.paragraph.result.msg) {
+ $scope.appendTextOutput($scope.paragraph.result.msg);
+ }
+ } else {
+ $timeout(retryRenderer, 10);
+ }
+ };
+ $timeout(retryRenderer);
+ };
+
+ $scope.clearTextOutput = function() {
+ var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
+ if (textEl.length) {
+ textEl.children().remove();
+ }
+ };
+
+ $scope.appendTextOutput = function(msg) {
+ var textEl = angular.element('#p' + $scope.paragraph.id + '_text');
+ if (textEl.length) {
+ var lines = msg.split('\n');
+ for (var i=0; i < lines.length; i++) {
+ textEl.append(angular.element('<div></div>').text(lines[i]));
+ }
+ }
+ };
+
+
var initializeDefault = function() {
var config = $scope.paragraph.config;
@@ -156,6 +194,10 @@ angular.module('zeppelinWebApp')
}
});
+ var isEmpty = function (object) {
+ return !object;
+ };
+
// TODO: this may have impact on performance when there are many paragraphs in a note.
$scope.$on('updateParagraph', function(event, data) {
if (data.paragraph.id === $scope.paragraph.id &&
@@ -166,6 +208,7 @@ angular.module('zeppelinWebApp')
data.paragraph.status !== $scope.paragraph.status ||
data.paragraph.jobName !== $scope.paragraph.jobName ||
data.paragraph.title !== $scope.paragraph.title ||
+ isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result) ||
data.paragraph.errorMessage !== $scope.paragraph.errorMessage ||
!angular.equals(data.paragraph.settings, $scope.paragraph.settings) ||
!angular.equals(data.paragraph.config, $scope.paragraph.config))
@@ -175,7 +218,8 @@ angular.module('zeppelinWebApp')
var newType = $scope.getResultType(data.paragraph);
var oldGraphMode = $scope.getGraphMode();
var newGraphMode = $scope.getGraphMode(data.paragraph);
- var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished);
+ var resultRefreshed = (data.paragraph.dateFinished !== $scope.paragraph.dateFinished) || isEmpty(data.paragraph.result) !== isEmpty($scope.paragraph.result);
+
var statusChanged = (data.paragraph.status !== $scope.paragraph.status);
//console.log("updateParagraph oldData %o, newData %o. type %o -> %o, mode %o -> %o", $scope.paragraph, data, oldType, newType, oldGraphMode, newGraphMode);
@@ -234,6 +278,8 @@ angular.module('zeppelinWebApp')
$scope.renderHtml();
} else if (newType === 'ANGULAR' && resultRefreshed) {
$scope.renderAngular();
+ } else if (newType === 'TEXT' && resultRefreshed) {
+ $scope.renderText();
}
if (statusChanged || resultRefreshed) {
@@ -252,6 +298,19 @@ angular.module('zeppelinWebApp')
});
+ $scope.$on('appendParagraphOutput', function(event, data) {
+ if ($scope.paragraph.id === data.paragraphId) {
+ $scope.appendTextOutput(data.data);
+ }
+ });
+
+ $scope.$on('updateParagraphOutput', function(event, data) {
+ if ($scope.paragraph.id === data.paragraphId) {
+ $scope.clearTextOutput(data.data);
+ $scope.appendTextOutput(data.data);
+ }
+ });
+
$scope.isRunning = function() {
if ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING') {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
index bb99d56..800d450 100644
--- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
+++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
@@ -54,6 +54,10 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope,
$rootScope.$broadcast('setNoteMenu', data.notes);
} else if (op === 'PARAGRAPH') {
$rootScope.$broadcast('updateParagraph', data);
+ } else if (op === 'PARAGRAPH_APPEND_OUTPUT') {
+ $rootScope.$broadcast('appendParagraphOutput', data);
+ } else if (op === 'PARAGRAPH_UPDATE_OUTPUT') {
+ $rootScope.$broadcast('updateParagraphOutput', data);
} else if (op === 'PROGRESS') {
$rootScope.$broadcast('updateProgress', data);
} else if (op === 'COMPLETION_LIST') {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 4ff0cc3..039d970 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -29,6 +29,7 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.slf4j.Logger;
@@ -65,25 +66,30 @@ public class InterpreterFactory {
private InterpreterOption defaultOption;
AngularObjectRegistryListener angularObjectRegistryListener;
+ private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
DependencyResolver depResolver;
public InterpreterFactory(ZeppelinConfiguration conf,
AngularObjectRegistryListener angularObjectRegistryListener,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener,
DependencyResolver depResolver)
throws InterpreterException, IOException {
- this(conf, new InterpreterOption(true), angularObjectRegistryListener, depResolver);
+ this(conf, new InterpreterOption(true), angularObjectRegistryListener,
+ remoteInterpreterProcessListener, depResolver);
}
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
AngularObjectRegistryListener angularObjectRegistryListener,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener,
DependencyResolver depResolver)
throws InterpreterException, IOException {
this.conf = conf;
this.defaultOption = defaultOption;
this.angularObjectRegistryListener = angularObjectRegistryListener;
this.depResolver = depResolver;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
interpreterClassList = replsConf.split(",");
@@ -500,7 +506,8 @@ public class InterpreterFactory {
/**
* Change interpreter property and restart
- * @param name
+ * @param id
+ * @param option
* @param properties
* @throws IOException
*/
@@ -659,7 +666,7 @@ public class InterpreterFactory {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
property, className, conf.getInterpreterRemoteRunnerPath(),
- interpreterPath, connectTimeout));
+ interpreterPath, connectTimeout, remoteInterpreterProcessListener));
return intp;
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
index 5a7e966..1387730 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
@@ -17,11 +17,9 @@
package org.apache.zeppelin.notebook;
-import org.apache.zeppelin.scheduler.JobListener;
-
/**
* TODO(moon): provide description.
*/
public interface JobListenerFactory {
- public JobListener getParagraphJobListener(Note note);
+ public ParagraphJobListener getParagraphJobListener(Note note);
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 392b968..10f080d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -19,39 +19,43 @@ package org.apache.zeppelin.notebook;
import java.io.IOException;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.utility.IdHashes;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.search.SearchService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Binded interpreters for a note
*/
public class Note implements Serializable, JobListener {
+ static Logger logger = LoggerFactory.getLogger(Note.class);
private static final long serialVersionUID = 7920699076577612429L;
+ // threadpool for delayed persist of note
+ private static final ScheduledThreadPoolExecutor delayedPersistThreadPool =
+ new ScheduledThreadPoolExecutor(0);
+ static {
+ delayedPersistThreadPool.setRemoveOnCancelPolicy(true);
+ }
+
final List<Paragraph> paragraphs = new LinkedList<>();
+
private String name = "";
private String id;
@@ -62,6 +66,7 @@ public class Note implements Serializable, JobListener {
private transient JobListenerFactory jobListenerFactory;
private transient NotebookRepo repo;
private transient SearchService index;
+ private transient ScheduledFuture delayedPersist;
/**
* note configurations.
@@ -144,9 +149,8 @@ public class Note implements Serializable, JobListener {
/**
* Add paragraph last.
- *
- * @param p
*/
+
public Paragraph addParagraph() {
Paragraph p = new Paragraph(this, this, replLoader);
synchronized (paragraphs) {
@@ -187,7 +191,6 @@ public class Note implements Serializable, JobListener {
* Insert paragraph in given index.
*
* @param index
- * @param p
*/
public Paragraph insertParagraph(int index) {
Paragraph p = new Paragraph(this, this, replLoader);
@@ -339,8 +342,6 @@ public class Note implements Serializable, JobListener {
/**
* Run all paragraphs sequentially.
- *
- * @param jobListener
*/
public void runAll() {
synchronized (paragraphs) {
@@ -400,15 +401,55 @@ public class Note implements Serializable, JobListener {
}
public void persist() throws IOException {
+ stopDelayedPersistTimer();
snapshotAngularObjectRegistry();
index.updateIndexDoc(this);
repo.save(this);
}
+ /**
+ * Persist this note with maximum delay.
+ * @param maxDelaySec
+ */
+ public void persist(int maxDelaySec) {
+ startDelayedPersistTimer(maxDelaySec);
+ }
+
public void unpersist() throws IOException {
repo.remove(id());
}
+
+ private void startDelayedPersistTimer(int maxDelaySec) {
+ synchronized (this) {
+ if (delayedPersist != null) {
+ return;
+ }
+
+ delayedPersist = delayedPersistThreadPool.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ persist();
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }, maxDelaySec, TimeUnit.SECONDS);
+ }
+ }
+
+ private void stopDelayedPersistTimer() {
+ synchronized (this) {
+ if (delayedPersist == null) {
+ return;
+ }
+
+ delayedPersist.cancel(false);
+ }
+ }
+
public Map<String, Object> getConfig() {
if (config == null) {
config = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 433095b..65210f5 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.scheduler.JobListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.io.Serializable;
import java.util.*;
@@ -213,7 +214,29 @@ public class Paragraph extends Job implements Serializable, Cloneable {
if (Code.KEEP_PREVIOUS_RESULT == ret.code()) {
return getReturn();
}
- return ret;
+
+ String message = "";
+
+ context.out.flush();
+ InterpreterResult.Type outputType = context.out.getType();
+ byte[] interpreterOutput = context.out.toByteArray();
+ context.out.clear();
+
+ if (interpreterOutput != null && interpreterOutput.length > 0) {
+ message = new String(interpreterOutput);
+ }
+
+ if (message.isEmpty()) {
+ return ret;
+ } else {
+ String interpreterResultMessage = ret.message();
+ if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
+ message += interpreterResultMessage;
+ return new InterpreterResult(ret.code(), ret.type(), message);
+ } else {
+ return new InterpreterResult(ret.code(), outputType, message);
+ }
+ }
} finally {
InterpreterContext.remove();
}
@@ -244,6 +267,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
runners.add(new ParagraphRunner(note, note.id(), p.getId()));
}
+ final Paragraph self = this;
InterpreterContext interpreterContext = new InterpreterContext(
note.id(),
getId(),
@@ -252,7 +276,34 @@ public class Paragraph extends Job implements Serializable, Cloneable {
this.getConfig(),
this.settings,
registry,
- runners);
+ runners,
+ new InterpreterOutput(new InterpreterOutputListener() {
+ @Override
+ public void onAppend(InterpreterOutput out, byte[] line) {
+ updateParagraphResult(out);
+ ((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line));
+ }
+
+ @Override
+ public void onUpdate(InterpreterOutput out, byte[] output) {
+ updateParagraphResult(out);
+ ((ParagraphJobListener) getListener()).onOutputUpdate(self, out,
+ new String(output));
+ }
+
+ private void updateParagraphResult(InterpreterOutput out) {
+ // update paragraph result
+ Throwable t = null;
+ String message = null;
+ try {
+ message = new String(out.toByteArray());
+ } catch (IOException e) {
+ logger().error(e.getMessage(), e);
+ t = e;
+ }
+ setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t);
+ }
+ }));
return interpreterContext;
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
new file mode 100644
index 0000000..f6404d7
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.scheduler.JobListener;
+
+/**
+ * Listen paragraph update
+ */
+public interface ParagraphJobListener extends JobListener {
+ public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output);
+ public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index abd0e3b..17d91cc 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -55,8 +55,8 @@ public class InterpreterFactoryTest {
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
conf = new ZeppelinConfiguration();
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
- context = new InterpreterContext("note", "id", "title", "text", null, null, null, null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
+ context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null);
}
@@ -140,7 +140,7 @@ public class InterpreterFactoryTest {
factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties());
assertEquals(3, factory.get().size());
- InterpreterFactory factory2 = new InterpreterFactory(conf, null, null);
+ InterpreterFactory factory2 = new InterpreterFactory(conf, null, null, null);
assertEquals(3, factory2.get().size());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
index 9496e4d..4fa8ef6 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
@@ -58,7 +58,7 @@ public class NoteInterpreterLoaderTest {
MockInterpreter11.register("mock11", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter11");
MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index e98e680..82ba137 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -38,6 +38,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
@@ -85,7 +86,7 @@ public class NotebookTest implements JobListenerFactory{
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
@@ -172,7 +173,8 @@ public class NotebookTest implements JobListenerFactory{
note.persist();
Notebook notebook2 = new Notebook(
- conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null), this, null);
+ conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null, null), this,
+ null);
assertEquals(1, notebook2.getAllNotes().size());
}
@@ -411,8 +413,16 @@ public class NotebookTest implements JobListenerFactory{
}
@Override
- public JobListener getParagraphJobListener(Note note) {
- return new JobListener(){
+ public ParagraphJobListener getParagraphJobListener(Note note) {
+ return new ParagraphJobListener(){
+
+ @Override
+ public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
+ }
+
+ @Override
+ public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
+ }
@Override
public void onProgressUpdate(Job job, int progress) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
index 60b3ba3..31970af 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
@@ -30,12 +30,10 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@@ -87,7 +85,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
SearchService search = mock(SearchService.class);
notebookRepoSync = new NotebookRepoSync(conf);
@@ -224,8 +222,16 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
}
@Override
- public JobListener getParagraphJobListener(Note note) {
- return new JobListener(){
+ public ParagraphJobListener getParagraphJobListener(Note note) {
+ return new ParagraphJobListener(){
+
+ @Override
+ public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
+ }
+
+ @Override
+ public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
+ }
@Override
public void onProgressUpdate(Job job, int progress) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
index cff086d..2e2801c 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
@@ -30,10 +30,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
@@ -76,7 +73,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
this.schedulerFactory = new SchedulerFactory();
- factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
+ factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
@@ -140,7 +137,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
}
@Override
- public JobListener getParagraphJobListener(Note note) {
+ public ParagraphJobListener getParagraphJobListener(Note note) {
return null;
}
}
[2/2] incubator-zeppelin git commit: [ZEPPELIN-554] Streaming
interpreter output to front-end
Posted by mo...@apache.org.
[ZEPPELIN-554] Streaming interpreter output to front-end
### What is this PR for?
Output from interpreter is displayed after completion of paragraph execution.
It'll be useful if output can be streamed to front-end during execution.
Previous work #593 injects InterpreterOutput stream object to Interpreter.
This PR is based on #593 and stream the data from InterpreterOutput to front-end.
This implementation only streams output is %text. Other output type (%html, %angular, %table) is not streamed to the front end.
While this PR keeps backward compatibility, Interpreter who want to use this feature will need to modify code to write output into `InterpreterOutput` instead of return with `InterpreterResult`.
This PR includes modification of SparkInterpreter to use InterpreterOutput.
### What type of PR is it?
Feature
### Todos
### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-554
### How should this be tested?
Run such code using Spark interpreter
```
(1 to 10).foreach{ i=>
Thread.sleep(1000)
println("Hello " + i)
}
```
### Screenshots (if appropriate)
![stream_output](https://cloud.githubusercontent.com/assets/1540981/12188677/6df08900-b56c-11e5-95d9-e5f6fad91007.gif)
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: Lee moon soo <mo...@apache.org>
Closes #611 from Leemoonsoo/output_stream_frontend and squashes the following commits:
53e2bb4 [Lee moon soo] Not persist on every append
dedae0d [Lee moon soo] Remove debug lines
8251fb4 [Lee moon soo] Fix syntax and style
9c9c8fd [Lee moon soo] update test
18215a3 [Lee moon soo] fix style
f7e6a4d [Lee moon soo] Fix syntax error
d29cfbf [Lee moon soo] workaround jshint
07b3e1a [Lee moon soo] Handle clear output correctly
bc6262e [Lee moon soo] Make PysparkInterpreter stream output
6d9cc51 [Lee moon soo] Pass InterpreterOutput to SparkILoop
b68180e [Lee moon soo] Add InterpreterOutput on spark interpreter unitest
626ad48 [Lee moon soo] Add license header
846015b [Lee moon soo] Update scalding
37d6920 [Lee moon soo] Handle display system directive correctly
e278e84 [Lee moon soo] Clear output correctly
479b836 [Lee moon soo] Add test
c01df62 [Lee moon soo] Connect Spark interpreter Console.out to outputstream
8a1223f [Lee moon soo] Handle update output correctly
e7a9b37 [Lee moon soo] Delayed persist
2060c1e [Lee moon soo] Clear before render text
786c978 [Lee moon soo] update paragraph object after witing to outputstream
258ff38 [Lee moon soo] Barely working
6f607f7 [Lee moon soo] Add newline listener
89d9798 [Lee moon soo] Render text output line by line
a42e4ff [Lee moon soo] Update test
fb5e7b5 [Lee moon soo] Update test
0f60b54 [Lee moon soo] Update test
a07d7db [Lee moon soo] Implement InterpreterResult.toString
1f419b6 [Lee moon soo] Add InterpreterOutput
c91f498 [Lee moon soo] prepend interpreteroutputstream to interpreter result
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/5ec59a81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/5ec59a81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/5ec59a81
Branch: refs/heads/master
Commit: 5ec59a81b2fda2fb65d4075e0672930b769f41d2
Parents: dbdaf84
Author: Lee moon soo <mo...@apache.org>
Authored: Sat Jan 16 11:04:09 2016 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Jan 20 15:39:21 2016 -0800
----------------------------------------------------------------------
.../zeppelin/flink/FlinkInterpreterTest.java | 2 +-
.../zeppelin/hive/HiveInterpreterTest.java | 12 +-
.../zeppelin/ignite/IgniteInterpreterTest.java | 2 +-
.../ignite/IgniteSqlInterpreterTest.java | 2 +-
.../scalding/ScaldingInterpreterTest.java | 2 +-
.../zeppelin/spark/PySparkInterpreter.java | 36 ++-
.../apache/zeppelin/spark/SparkInterpreter.java | 32 +--
.../zeppelin/spark/SparkOutputStream.java | 75 ++++++
.../apache/zeppelin/spark/ZeppelinContext.java | 18 +-
.../main/resources/python/zeppelin_pyspark.py | 7 +-
.../zeppelin/spark/DepInterpreterTest.java | 2 +-
.../zeppelin/spark/SparkInterpreterTest.java | 23 +-
.../zeppelin/spark/SparkSqlInterpreterTest.java | 17 +-
.../interpreter/InterpreterContext.java | 5 +-
.../zeppelin/interpreter/InterpreterOutput.java | 249 +++++++++++++++++++
.../InterpreterOutputChangeListener.java | 27 ++
.../InterpreterOutputChangeWatcher.java | 140 +++++++++++
.../interpreter/InterpreterOutputListener.java | 34 +++
.../zeppelin/interpreter/InterpreterResult.java | 4 +
.../interpreter/remote/RemoteInterpreter.java | 27 +-
.../remote/RemoteInterpreterEventPoller.java | 26 +-
.../remote/RemoteInterpreterProcess.java | 9 +-
.../RemoteInterpreterProcessListener.java | 25 ++
.../remote/RemoteInterpreterServer.java | 68 ++++-
.../thrift/RemoteInterpreterContext.java | 2 +-
.../thrift/RemoteInterpreterEvent.java | 2 +-
.../thrift/RemoteInterpreterEventType.java | 8 +-
.../thrift/RemoteInterpreterResult.java | 2 +-
.../thrift/RemoteInterpreterService.java | 2 +-
.../main/thrift/RemoteInterpreterService.thrift | 4 +-
.../interpreter/InterpreterContextTest.java | 2 +-
.../InterpreterOutputChangeWatcherTest.java | 109 ++++++++
.../interpreter/InterpreterOutputTest.java | 127 ++++++++++
.../interpreter/InterpreterResultTest.java | 5 +
.../remote/RemoteAngularObjectTest.java | 15 +-
.../RemoteInterpreterOutputTestStream.java | 146 +++++++++++
.../remote/RemoteInterpreterProcessTest.java | 2 +-
.../remote/RemoteInterpreterTest.java | 174 +++++--------
.../mock/MockInterpreterOutputStream.java | 97 ++++++++
.../zeppelin/scheduler/RemoteSchedulerTest.java | 32 +--
.../apache/zeppelin/server/ZeppelinServer.java | 3 +-
.../org/apache/zeppelin/socket/Message.java | 2 +
.../apache/zeppelin/socket/NotebookServer.java | 82 +++++-
.../notebook/paragraph/paragraph-results.html | 12 +-
.../notebook/paragraph/paragraph.controller.js | 65 ++++-
.../websocketEvents/websocketEvents.factory.js | 4 +
.../interpreter/InterpreterFactory.java | 13 +-
.../zeppelin/notebook/JobListenerFactory.java | 4 +-
.../java/org/apache/zeppelin/notebook/Note.java | 77 ++++--
.../org/apache/zeppelin/notebook/Paragraph.java | 55 +++-
.../zeppelin/notebook/ParagraphJobListener.java | 29 +++
.../interpreter/InterpreterFactoryTest.java | 6 +-
.../notebook/NoteInterpreterLoaderTest.java | 2 +-
.../apache/zeppelin/notebook/NotebookTest.java | 18 +-
.../notebook/repo/NotebookRepoSyncTest.java | 20 +-
.../notebook/repo/VFSNotebookRepoTest.java | 9 +-
56 files changed, 1672 insertions(+), 302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 3168f04..9a61be6 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -40,7 +40,7 @@ public class FlinkInterpreterTest {
Properties p = new Properties();
flink = new FlinkInterpreter(p);
flink.open();
- context = new InterpreterContext(null, null, null, null, null, null, null, null);
+ context = new InterpreterContext(null, null, null, null, null, null, null, null, null);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
index c22080d..c86fcf3 100644
--- a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
+++ b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
@@ -79,9 +79,9 @@ public class HiveInterpreterTest {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();
- assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message().contains("SCHEMA_NAME"));
+ assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME"));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
- t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
+ t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
}
@Test
@@ -101,7 +101,7 @@ public class HiveInterpreterTest {
t.open();
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
- t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
+ t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
}
@Test
@@ -117,13 +117,13 @@ public class HiveInterpreterTest {
t.open();
InterpreterResult interpreterResult =
- t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
+ t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
t.getConnection("default").close();
interpreterResult =
- t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
+ t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
}
@@ -139,7 +139,7 @@ public class HiveInterpreterTest {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();
- InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null);
+ InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null);
//simple select test
InterpreterResult result = t.interpret("select * from test_table", interpreterContext);
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
index f46b049..cf98083 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
@@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";
private static final InterpreterContext INTP_CONTEXT =
- new InterpreterContext(null, null, null, null, null, null, null, null);
+ new InterpreterContext(null, null, null, null, null, null, null, null, null);
private IgniteInterpreter intp;
private Ignite ignite;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
index fb93ad5..a6dcc66 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
@@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";
private static final InterpreterContext INTP_CONTEXT =
- new InterpreterContext(null, null, null, null, null, null, null, null);
+ new InterpreterContext(null, null, null, null, null, null, null, null, null);
private Ignite ignite;
private IgniteSqlInterpreter intp;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index 7a753fa..606d4d9 100644
--- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -65,7 +65,7 @@ public class ScaldingInterpreterTest {
context = new InterpreterContext("note", "id", "title", "text",
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), null);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 8c4ba87..c5441ab 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -73,8 +73,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
private GatewayServer gatewayServer;
private DefaultExecutor executor;
private int port;
- private ByteArrayOutputStream outputStream;
- private ByteArrayOutputStream errStream;
+ private SparkOutputStream outputStream;
private BufferedWriter ins;
private PipedInputStream in;
private ByteArrayOutputStream input;
@@ -173,7 +172,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
cmd.addArgument(Integer.toString(port), false);
cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
executor = new DefaultExecutor();
- outputStream = new ByteArrayOutputStream();
+ outputStream = new SparkOutputStream();
PipedOutputStream ps = new PipedOutputStream();
in = null;
try {
@@ -274,7 +273,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
statementError = error;
statementFinishedNotifier.notify();
}
-
}
boolean pythonScriptInitialized = false;
@@ -287,6 +285,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
+ public void appendOutput(String message) throws IOException {
+ outputStream.getInterpreterOutput().write(message);
+ }
+
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
@@ -300,7 +302,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
+ outputStream.toString());
}
- outputStream.reset();
+ outputStream.setInterpreterOutput(context.out);
synchronized (pythonScriptInitializeNotifier) {
long startTime = System.currentTimeMillis();
@@ -314,15 +316,24 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
+ String errorMessage = "";
+ try {
+ context.out.flush();
+ errorMessage = new String(context.out.toByteArray());
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+
+
if (pythonscriptRunning == false) {
// python script failed to initialize and terminated
return new InterpreterResult(Code.ERROR, "failed to start pyspark"
- + outputStream.toString());
+ + errorMessage);
}
if (pythonScriptInitialized == false) {
// timeout. didn't get initialized message
return new InterpreterResult(Code.ERROR, "pyspark is not responding "
- + outputStream.toString());
+ + errorMessage);
}
if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
@@ -352,7 +363,14 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
if (statementError) {
return new InterpreterResult(Code.ERROR, statementOutput);
} else {
- return new InterpreterResult(Code.SUCCESS, statementOutput);
+
+ try {
+ context.out.flush();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+
+ return new InterpreterResult(Code.SUCCESS);
}
}
@@ -389,8 +407,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
return new LinkedList<String>();
}
- outputStream.reset();
-
pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "");
statementOutput = null;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index d975791..7ee6d7c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -17,9 +17,7 @@
package org.apache.zeppelin.spark;
-import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -41,7 +39,6 @@ import org.apache.spark.repl.SparkJLineCompletion;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
-import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
@@ -115,7 +112,7 @@ public class SparkInterpreter extends Interpreter {
private SparkILoop interpreter;
private SparkIMain intp;
private SparkContext sc;
- private ByteArrayOutputStream out;
+ private SparkOutputStream out;
private SQLContext sqlc;
private SparkDependencyResolver dep;
private SparkJLineCompletion completor;
@@ -129,7 +126,7 @@ public class SparkInterpreter extends Interpreter {
public SparkInterpreter(Properties property) {
super(property);
- out = new ByteArrayOutputStream();
+ out = new SparkOutputStream();
}
public SparkInterpreter(Properties property, SparkContext sc) {
@@ -452,10 +449,9 @@ public class SparkInterpreter extends Interpreter {
b.v_$eq(true);
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
- PrintStream printStream = new PrintStream(out);
-
/* spark interpreter */
this.interpreter = new SparkILoop(null, new PrintWriter(out));
+
interpreter.settings_$eq(settings);
interpreter.createInterpreter();
@@ -481,7 +477,7 @@ public class SparkInterpreter extends Interpreter {
dep = getDependencyResolver();
- z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
+ z = new ZeppelinContext(sc, sqlc, null, dep,
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
@@ -489,7 +485,6 @@ public class SparkInterpreter extends Interpreter {
binder.put("sc", sc);
binder.put("sqlc", sqlc);
binder.put("z", z);
- binder.put("out", printStream);
intp.interpret("@transient val z = "
+ "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
@@ -675,13 +670,13 @@ public class SparkInterpreter extends Interpreter {
synchronized (this) {
z.setGui(context.getGui());
sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
- InterpreterResult r = interpretInput(lines);
+ InterpreterResult r = interpretInput(lines, context);
sc.clearJobGroup();
return r;
}
}
- public InterpreterResult interpretInput(String[] lines) {
+ public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
SparkEnv.set(env);
// add print("") to make sure not finishing with comment
@@ -692,8 +687,9 @@ public class SparkInterpreter extends Interpreter {
}
linesToRun[lines.length] = "print(\"\")";
- Console.setOut((java.io.PrintStream) binder.get("out"));
- out.reset();
+ Console.setOut(context.out);
+ out.setInterpreterOutput(context.out);
+ context.out.clear();
Code r = null;
String incomplete = "";
@@ -713,6 +709,7 @@ public class SparkInterpreter extends Interpreter {
res = intp.interpret(incomplete + s);
} catch (Exception e) {
sc.clearJobGroup();
+ out.setInterpreterOutput(null);
logger.info("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
}
@@ -721,7 +718,8 @@ public class SparkInterpreter extends Interpreter {
if (r == Code.ERROR) {
sc.clearJobGroup();
- return new InterpreterResult(r, out.toString());
+ out.setInterpreterOutput(null);
+ return new InterpreterResult(r, "");
} else if (r == Code.INCOMPLETE) {
incomplete += s + "\n";
} else {
@@ -730,9 +728,13 @@ public class SparkInterpreter extends Interpreter {
}
if (r == Code.INCOMPLETE) {
+ sc.clearJobGroup();
+ out.setInterpreterOutput(null);
return new InterpreterResult(r, "Incomplete expression");
} else {
- return new InterpreterResult(r, out.toString());
+ sc.clearJobGroup();
+ out.setInterpreterOutput(null);
+ return new InterpreterResult(Code.SUCCESS);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
new file mode 100644
index 0000000..98a4090
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * InterpreterOutput can be attached / detached.
+ */
+public class SparkOutputStream extends OutputStream {
+ InterpreterOutput interpreterOutput;
+
+ public SparkOutputStream() {
+ }
+
+ public InterpreterOutput getInterpreterOutput() {
+ return interpreterOutput;
+ }
+
+ public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
+ this.interpreterOutput = interpreterOutput;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte [] b) throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte [] b, int offset, int len) throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.write(b, offset, len);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.close();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (interpreterOutput != null) {
+ interpreterOutput.flush();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index a55ed73..6869161 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -21,6 +21,7 @@ import static scala.collection.JavaConversions.asJavaCollection;
import static scala.collection.JavaConversions.asJavaIterable;
import static scala.collection.JavaConversions.collectionAsScalaIterable;
+import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -54,19 +55,17 @@ import scala.collection.Iterable;
*/
public class ZeppelinContext extends HashMap<String, Object> {
private SparkDependencyResolver dep;
- private PrintStream out;
private InterpreterContext interpreterContext;
private int maxResult;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
- SparkDependencyResolver dep, PrintStream printStream,
+ SparkDependencyResolver dep,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
- this.out = printStream;
this.maxResult = maxResult;
}
@@ -273,10 +272,15 @@ public class ZeppelinContext extends HashMap<String, Object> {
throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
}
- if (cls.isInstance(o)) {
- out.print(showDF(sc, interpreterContext, o, maxResult));
- } else {
- out.print(o.toString());
+
+ try {
+ if (cls.isInstance(o)) {
+ interpreterContext.out.write(showDF(sc, interpreterContext, o, maxResult));
+ } else {
+ interpreterContext.out.write(o.toString());
+ }
+ } catch (IOException e) {
+ throw new InterpreterException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 62f0a82..7da0f4e 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -36,10 +36,7 @@ class Logger(object):
self.out = ""
def write(self, message):
- self.out = self.out + message
-
- def get(self):
- return self.out
+ intp.appendOutput(message)
def reset(self):
self.out = ""
@@ -224,7 +221,7 @@ while True :
sc.setJobGroup(jobGroup, "Zeppelin")
eval(compiledCode)
- intp.setStatementsFinished(output.get(), False)
+ intp.setStatementsFinished("", False)
except Py4JJavaError:
excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
innerErrorStart = excInnerError.find("Py4JJavaError:")
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index efa8fae..2b5613a 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -60,7 +60,7 @@ public class DepInterpreterTest {
context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), null);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index b629978..778966f 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -28,10 +28,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.After;
import org.junit.Before;
@@ -79,9 +76,21 @@ public class SparkInterpreterTest {
InterpreterGroup intpGroup = new InterpreterGroup();
context = new InterpreterContext("note", "id", "title", "text",
- new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
- intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>(),
+ new InterpreterOutput(new InterpreterOutputListener() {
+ @Override
+ public void onAppend(InterpreterOutput out, byte[] line) {
+
+ }
+
+ @Override
+ public void onUpdate(InterpreterOutput out, byte[] output) {
+
+ }
+ }));
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 4688cf8..731eab6 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -25,10 +25,7 @@ import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.junit.After;
import org.junit.Before;
@@ -69,7 +66,17 @@ public class SparkSqlInterpreterTest {
}
context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
+ @Override
+ public void onAppend(InterpreterOutput out, byte[] line) {
+
+ }
+
+ @Override
+ public void onUpdate(InterpreterOutput out, byte[] output) {
+
+ }
+ }));
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 0417f91..e3f6b59 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -29,6 +29,7 @@ import org.apache.zeppelin.display.GUI;
public class InterpreterContext {
private static final ThreadLocal<InterpreterContext> threadIC =
new ThreadLocal<InterpreterContext>();
+ public final InterpreterOutput out;
public static InterpreterContext get() {
return threadIC.get();
@@ -58,7 +59,8 @@ public class InterpreterContext {
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry,
- List<InterpreterContextRunner> runners
+ List<InterpreterContextRunner> runners,
+ InterpreterOutput out
) {
this.noteId = noteId;
this.paragraphId = paragraphId;
@@ -68,6 +70,7 @@ public class InterpreterContext {
this.gui = gui;
this.angularObjectRegistry = angularObjectRegistry;
this.runners = runners;
+ this.out = out;
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
new file mode 100644
index 0000000..42ebe48
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * InterpreterOutput is OutputStream that supposed to print content on notebook
+ * in addition to InterpreterResult which used to return from Interpreter.interpret().
+ */
+public class InterpreterOutput extends OutputStream {
+ Logger logger = LoggerFactory.getLogger(InterpreterOutput.class);
+ private final int NEW_LINE_CHAR = '\n';
+
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+ private final List<Object> outList = new LinkedList<Object>();
+ private InterpreterOutputChangeWatcher watcher;
+ private final InterpreterOutputListener flushListener;
+ private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
+ private boolean firstWrite = true;
+
+ public InterpreterOutput(InterpreterOutputListener flushListener) {
+ this.flushListener = flushListener;
+ clear();
+ }
+
+ public InterpreterOutput(InterpreterOutputListener flushListener,
+ InterpreterOutputChangeListener listener) throws IOException {
+ this.flushListener = flushListener;
+ clear();
+ watcher = new InterpreterOutputChangeWatcher(listener);
+ watcher.start();
+ }
+
+ public InterpreterResult.Type getType() {
+ return type;
+ }
+
+ public void setType(InterpreterResult.Type type) {
+ if (this.type != type) {
+ clear();
+ flushListener.onUpdate(this, new byte[]{});
+ this.type = type;
+ }
+ }
+
+ public void clear() {
+ synchronized (outList) {
+ type = InterpreterResult.Type.TEXT;
+ buffer.reset();
+ outList.clear();
+ if (watcher != null) {
+ watcher.clear();
+ }
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ synchronized (outList) {
+ buffer.write(b);
+ if (b == NEW_LINE_CHAR) {
+ // first time use of this outputstream.
+ if (firstWrite) {
+ // clear the output on gui
+ flushListener.onUpdate(this, new byte[]{});
+ firstWrite = false;
+ }
+
+ flush();
+ }
+ }
+ }
+
+ private byte [] detectTypeFromLine(byte [] byteArray) {
+ // check output type directive
+ String line = new String(byteArray);
+ for (InterpreterResult.Type t : InterpreterResult.Type.values()) {
+ String typeString = '%' + t.name().toLowerCase();
+ if ((typeString + "\n").equals(line)) {
+ setType(t);
+ byteArray = null;
+ break;
+ } else if (line.startsWith(typeString + " ")) {
+ setType(t);
+ byteArray = line.substring(typeString.length() + 1).getBytes();
+ break;
+ }
+ }
+
+ return byteArray;
+ }
+
+ @Override
+ public void write(byte [] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte [] b, int off, int len) throws IOException {
+ synchronized (outList) {
+ for (int i = off; i < len; i++) {
+ write(b[i]);
+ }
+ }
+ }
+
+ /**
+ * In dev mode, it monitors file and update ZeppelinServer
+ * @param file
+ * @throws IOException
+ */
+ public void write(File file) throws IOException {
+ outList.add(file);
+ if (watcher != null) {
+ watcher.watch(file);
+ }
+ }
+
+ public void write(String string) throws IOException {
+ write(string.getBytes());
+ }
+
+ /**
+ * write contents in the resource file in the classpath
+ * @param url
+ * @throws IOException
+ */
+ public void write(URL url) throws IOException {
+ if ("file".equals(url.getProtocol())) {
+ write(new File(url.getPath()));
+ } else {
+ outList.add(url);
+ }
+ }
+
+ public void writeResource(String resourceName) throws IOException {
+ // search file under resource dir first for dev mode
+ File mainResource = new File("./src/main/resources/" + resourceName);
+ File testResource = new File("./src/test/resources/" + resourceName);
+ if (mainResource.isFile()) {
+ write(mainResource);
+ } else if (testResource.isFile()) {
+ write(testResource);
+ } else {
+ // search from classpath
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ if (cl == null) {
+ cl = this.getClass().getClassLoader();
+ }
+ if (cl == null) {
+ cl = ClassLoader.getSystemClassLoader();
+ }
+
+ write(cl.getResource(resourceName));
+ }
+ }
+
+ public byte[] toByteArray() throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ List<Object> all = new LinkedList<Object>();
+
+ synchronized (outList) {
+ all.addAll(outList);
+ }
+
+ for (Object o : all) {
+ if (o instanceof File) {
+ File f = (File) o;
+ FileInputStream fin = new FileInputStream(f);
+ copyStream(fin, out);
+ fin.close();
+ } else if (o instanceof byte[]) {
+ out.write((byte[]) o);
+ } else if (o instanceof Integer) {
+ out.write((int) o);
+ } else if (o instanceof URL) {
+ InputStream fin = ((URL) o).openStream();
+ copyStream(fin, out);
+ fin.close();
+ } else {
+ // can not handle the object
+ }
+ }
+ out.close();
+ return out.toByteArray();
+ }
+
+ public void flush() throws IOException {
+ synchronized (outList) {
+ buffer.flush();
+ byte[] bytes = buffer.toByteArray();
+ bytes = detectTypeFromLine(bytes);
+ if (bytes != null) {
+ outList.add(bytes);
+ if (type == InterpreterResult.Type.TEXT) {
+ flushListener.onAppend(this, bytes);
+ }
+ }
+ buffer.reset();
+ }
+ }
+
+ private void copyStream(InputStream in, OutputStream out) throws IOException {
+ int bufferSize = 8192;
+ byte[] buffer = new byte[bufferSize];
+
+ while (true) {
+ int bytesRead = in.read(buffer);
+ if (bytesRead == -1) {
+ break;
+ } else {
+ out.write(buffer, 0, bytesRead);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+
+ if (watcher != null) {
+ watcher.clear();
+ watcher.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
new file mode 100644
index 0000000..a639e0c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import java.io.File;
+
+/**
+ * InterpreterOutputChangeListener
+ */
+public interface InterpreterOutputChangeListener {
+ public void fileChanged(File file);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
new file mode 100644
index 0000000..5fe8237
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcher.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Watch the change for the development mode support
+ */
+public class InterpreterOutputChangeWatcher extends Thread {
+ Logger logger = LoggerFactory.getLogger(InterpreterOutputChangeWatcher.class);
+
+ private WatchService watcher;
+ private final List<File> watchFiles = new LinkedList<File>();
+ private final Map<WatchKey, File> watchKeys = new HashMap<WatchKey, File>();
+ private InterpreterOutputChangeListener listener;
+ private boolean stop;
+
+ public InterpreterOutputChangeWatcher(InterpreterOutputChangeListener listener)
+ throws IOException {
+ watcher = FileSystems.getDefault().newWatchService();
+ this.listener = listener;
+ }
+
+ public void watch(File file) throws IOException {
+ String dirString;
+ if (file.isFile()) {
+ dirString = file.getParentFile().getAbsolutePath();
+ } else {
+ throw new IOException(file.getName() + " is not a file");
+ }
+
+ if (dirString == null) {
+ dirString = "/";
+ }
+
+ Path dir = FileSystems.getDefault().getPath(dirString);
+ logger.info("watch " + dir);
+ WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
+ synchronized (watchKeys) {
+ watchKeys.put(key, new File(dirString));
+ watchFiles.add(file);
+ }
+ }
+
+ public void clear() {
+ synchronized (watchKeys) {
+ for (WatchKey key : watchKeys.keySet()) {
+ key.cancel();
+
+ }
+ watchKeys.clear();
+ watchFiles.clear();
+ }
+ }
+
+ public void shutdown() throws IOException {
+ stop = true;
+ clear();
+ watcher.close();
+ }
+
+ public void run() {
+ while (!stop) {
+ WatchKey key = null;
+ try {
+ key = watcher.poll(1, TimeUnit.SECONDS);
+ } catch (InterruptedException | ClosedWatchServiceException e) {
+ break;
+ }
+
+ if (key == null) {
+ continue;
+ }
+ for (WatchEvent<?> event : key.pollEvents()) {
+ WatchEvent.Kind<?> kind = event.kind();
+ if (kind == OVERFLOW) {
+ continue;
+ }
+ WatchEvent<Path> ev = (WatchEvent<Path>) event;
+ Path filename = ev.context();
+ // search for filename
+ synchronized (watchKeys) {
+ for (File f : watchFiles) {
+ if (f.getName().compareTo(filename.toString()) == 0) {
+ File changedFile;
+ if (filename.isAbsolute()) {
+ changedFile = new File(filename.toString());
+ } else {
+ changedFile = new File(watchKeys.get(key), filename.toString());
+ }
+ logger.info("File change detected " + changedFile.getAbsolutePath());
+ if (listener != null) {
+ listener.fileChanged(changedFile);
+ }
+ }
+ }
+ }
+ }
+
+ boolean valid = key.reset();
+ if (!valid) {
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
new file mode 100644
index 0000000..bdb262a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+/**
+ * Listen InterpreterOutput buffer flush
+ */
+public interface InterpreterOutputListener {
+ /**
+ * called when newline is detected
+ * @param line
+ */
+ public void onAppend(InterpreterOutput out, byte[] line);
+
+ /**
+ * when entire output is updated. eg) after detecting new display system
+ * @param output
+ */
+ public void onUpdate(InterpreterOutput out, byte[] output);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 593cfc7..d213796 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -146,4 +146,8 @@ public class InterpreterResult implements Serializable {
this.type = type;
return this;
}
+
+ public String toString() {
+ return "%" + type.name().toLowerCase() + " " + msg;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 455156c..d2a24e8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -48,6 +48,7 @@ import com.google.gson.reflect.TypeToken;
*
*/
public class RemoteInterpreter extends Interpreter {
+ private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
Gson gson = new Gson();
private String interpreterRunner;
@@ -60,32 +61,35 @@ public class RemoteInterpreter extends Interpreter {
private int connectTimeout;
public RemoteInterpreter(Properties property,
- String className,
- String interpreterRunner,
- String interpreterPath,
- int connectTimeout) {
+ String className,
+ String interpreterRunner,
+ String interpreterPath,
+ int connectTimeout,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
-
this.className = className;
initialized = false;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
env = new HashMap<String, String>();
this.connectTimeout = connectTimeout;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
public RemoteInterpreter(Properties property,
- String className,
- String interpreterRunner,
- String interpreterPath,
- Map<String, String> env,
- int connectTimeout) {
+ String className,
+ String interpreterRunner,
+ String interpreterPath,
+ Map<String, String> env,
+ int connectTimeout,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.className = className;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.env = env;
this.connectTimeout = connectTimeout;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
@Override
@@ -103,7 +107,8 @@ public class RemoteInterpreter extends Interpreter {
if (intpGroup.getRemoteInterpreterProcess() == null) {
// create new remote process
RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
- interpreterRunner, interpreterPath, env, connectTimeout);
+ interpreterRunner, interpreterPath, env, connectTimeout,
+ remoteInterpreterProcessListener);
intpGroup.setRemoteInterpreterProcess(remoteProcess);
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index c39e0fe..6186205 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -18,29 +18,35 @@
package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
/**
*
*/
public class RemoteInterpreterEventPoller extends Thread {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+ private final RemoteInterpreterProcessListener listener;
private volatile boolean shutdown;
private RemoteInterpreterProcess interpreterProcess;
private InterpreterGroup interpreterGroup;
- public RemoteInterpreterEventPoller() {
+ public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) {
+ this.listener = listener;
shutdown = false;
}
@@ -110,6 +116,24 @@ public class RemoteInterpreterEventPoller extends Thread {
interpreterProcess.getInterpreterContextRunnerPool().run(
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+ } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
+ // on output append
+ Map<String, String> outputAppend = gson.fromJson(
+ event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+ String noteId = outputAppend.get("noteId");
+ String paragraphId = outputAppend.get("paragraphId");
+ String outputToAppend = outputAppend.get("data");
+
+ listener.onOutputAppend(noteId, paragraphId, outputToAppend);
+ } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
+ // on output update
+ Map<String, String> outputAppend = gson.fromJson(
+ event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+ String noteId = outputAppend.get("noteId");
+ String paragraphId = outputAppend.get("paragraphId");
+ String outputToUpdate = outputAppend.get("data");
+
+ listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
}
logger.debug("Event from remoteproceess {}", event.getType());
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 2c195dc..56b5485 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -53,10 +53,11 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
private int connectTimeout;
public RemoteInterpreterProcess(String intpRunner,
- String intpDir,
- Map<String, String> env,
- int connectTimeout) {
- this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(), connectTimeout);
+ String intpDir,
+ Map<String, String> env,
+ int connectTimeout,
+ RemoteInterpreterProcessListener listener) {
+ this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(listener), connectTimeout);
}
RemoteInterpreterProcess(String intpRunner,
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
new file mode 100644
index 0000000..da6ac63
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+/**
+ * Event from remoteInterpreterProcess
+ */
+public interface RemoteInterpreterProcessListener {
+ public void onOutputAppend(String noteId, String paragraphId, String output);
+ public void onOutputUpdated(String noteId, String paragraphId, String output);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index a8da8c0..728d210 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -35,15 +35,8 @@ import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
@@ -300,7 +293,26 @@ public class RemoteInterpreterServer
try {
InterpreterContext.set(context);
InterpreterResult result = interpreter.interpret(script, context);
- return result;
+
+ // data from context.out is prepended to InterpreterResult if both defined
+ String message = "";
+
+ context.out.flush();
+ InterpreterResult.Type outputType = context.out.getType();
+ byte[] interpreterOutput = context.out.toByteArray();
+ context.out.clear();
+
+ if (interpreterOutput != null && interpreterOutput.length > 0) {
+ message = new String(interpreterOutput);
+ }
+
+ String interpreterResultMessage = result.message();
+ if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
+ message += interpreterResultMessage;
+ return new InterpreterResult(result.code(), result.type(), message);
+ } else {
+ return new InterpreterResult(result.code(), outputType, message);
+ }
} finally {
InterpreterContext.remove();
}
@@ -351,7 +363,8 @@ public class RemoteInterpreterServer
private InterpreterContext convert(RemoteInterpreterContext ric) {
List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
- new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType());
+ new TypeToken<List<RemoteInterpreterContextRunner>>() {
+ }.getType());
for (InterpreterContextRunner r : runners) {
contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
@@ -366,7 +379,40 @@ public class RemoteInterpreterServer
new TypeToken<Map<String, Object>>() {}.getType()),
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry(),
- contextRunners);
+ contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
+ }
+
+
+ private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) {
+ return new InterpreterOutput(new InterpreterOutputListener() {
+ @Override
+ public void onAppend(InterpreterOutput out, byte[] line) {
+ Map<String, String> appendOutput = new HashMap<String, String>();
+ appendOutput.put("noteId", noteId);
+ appendOutput.put("paragraphId", paragraphId);
+ appendOutput.put("data", new String(line));
+
+ Gson gson = new Gson();
+
+ sendEvent(new RemoteInterpreterEvent(
+ RemoteInterpreterEventType.OUTPUT_APPEND,
+ gson.toJson(appendOutput)));
+ }
+
+ @Override
+ public void onUpdate(InterpreterOutput out, byte[] output) {
+ Map<String, String> appendOutput = new HashMap<String, String>();
+ appendOutput.put("noteId", noteId);
+ appendOutput.put("paragraphId", paragraphId);
+ appendOutput.put("data", new String(output));
+
+ Gson gson = new Gson();
+
+ sendEvent(new RemoteInterpreterEvent(
+ RemoteInterpreterEventType.OUTPUT_UPDATE,
+ gson.toJson(appendOutput)));
+ }
+ });
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index a55d5de..175f482 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
index 96a49b5..79203fb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
index 9a7d142..d650318 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -33,7 +33,9 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
ANGULAR_OBJECT_ADD(2),
ANGULAR_OBJECT_UPDATE(3),
ANGULAR_OBJECT_REMOVE(4),
- RUN_INTERPRETER_CONTEXT_RUNNER(5);
+ RUN_INTERPRETER_CONTEXT_RUNNER(5),
+ OUTPUT_APPEND(6),
+ OUTPUT_UPDATE(7);
private final int value;
@@ -64,6 +66,10 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return ANGULAR_OBJECT_REMOVE;
case 5:
return RUN_INTERPRETER_CONTEXT_RUNNER;
+ case 6:
+ return OUTPUT_APPEND;
+ case 7:
+ return OUTPUT_UPDATE;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
index 36c0f25..cc50f9c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index 6e6730e..738b453 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-7")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
public class RemoteInterpreterService {
public interface Iface {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 144784c..65fd0a7 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -42,7 +42,9 @@ enum RemoteInterpreterEventType {
ANGULAR_OBJECT_ADD = 2,
ANGULAR_OBJECT_UPDATE = 3,
ANGULAR_OBJECT_REMOVE = 4,
- RUN_INTERPRETER_CONTEXT_RUNNER = 5
+ RUN_INTERPRETER_CONTEXT_RUNNER = 5,
+ OUTPUT_APPEND = 6,
+ OUTPUT_UPDATE = 7
}
struct RemoteInterpreterEvent {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
index 080bdaa..9c2732d 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
@@ -27,7 +27,7 @@ public class InterpreterContextTest {
public void testThreadLocal() {
assertNull(InterpreterContext.get());
- InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null));
+ InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null));
assertNotNull(InterpreterContext.get());
InterpreterContext.remove();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
new file mode 100644
index 0000000..e376809
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener {
+ private File tmpDir;
+ private File fileChanged;
+ private int numChanged;
+ private InterpreterOutputChangeWatcher watcher;
+
+ @Before
+ public void setUp() throws Exception {
+ watcher = new InterpreterOutputChangeWatcher(this);
+ watcher.start();
+
+ tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+ tmpDir.mkdirs();
+ fileChanged = null;
+ numChanged = 0;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ watcher.shutdown();
+ delete(tmpDir);
+ }
+
+ private void delete(File file){
+ if(file.isFile()) file.delete();
+ else if(file.isDirectory()){
+ File [] files = file.listFiles();
+ if(files!=null && files.length>0){
+ for(File f : files){
+ delete(f);
+ }
+ }
+ file.delete();
+ }
+ }
+
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ assertNull(fileChanged);
+ assertEquals(0, numChanged);
+
+ Thread.sleep(1000);
+ // create new file
+ File file1 = new File(tmpDir, "test1");
+ file1.createNewFile();
+
+ File file2 = new File(tmpDir, "test2");
+ file2.createNewFile();
+
+ watcher.watch(file1);
+ Thread.sleep(1000);
+
+ FileOutputStream out1 = new FileOutputStream(file1);
+ out1.write(1);
+ out1.close();
+
+ FileOutputStream out2 = new FileOutputStream(file2);
+ out2.write(1);
+ out2.close();
+
+ synchronized (this) {
+ wait(30*1000);
+ }
+
+ assertNotNull(fileChanged);
+ assertEquals(1, numChanged);
+ }
+
+
+ @Override
+ public void fileChanged(File file) {
+ fileChanged = file;
+ numChanged++;
+
+ synchronized(this) {
+ notify();
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
new file mode 100644
index 0000000..f8f4809
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class InterpreterOutputTest implements InterpreterOutputListener {
+ private InterpreterOutput out;
+ int numAppendEvent;
+ int numUpdateEvent;
+
+ @Before
+ public void setUp() {
+ out = new InterpreterOutput(this);
+ numAppendEvent = 0;
+ numUpdateEvent = 0;
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ out.close();
+ }
+
+ @Test
+ public void testDetectNewline() throws IOException {
+ out.write("hello\nworld");
+ assertEquals("hello\n", new String(out.toByteArray()));
+ assertEquals(1, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+
+ out.write("\n");
+ assertEquals("hello\nworld\n", new String(out.toByteArray()));
+ assertEquals(2, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+ }
+
+ @Test
+ public void testFlush() throws IOException {
+ out.write("hello\nworld");
+ assertEquals("hello\n", new String(out.toByteArray()));
+ assertEquals(1, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+
+ out.flush();
+ assertEquals("hello\nworld", new String(out.toByteArray()));
+ assertEquals(2, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+
+ out.clear();
+ out.write("%html div");
+ assertEquals("", new String(out.toByteArray()));
+ assertEquals(InterpreterResult.Type.TEXT, out.getType());
+
+ out.flush();
+ out.write("%html div");
+ assertEquals("div", new String(out.toByteArray()));
+ assertEquals(InterpreterResult.Type.HTML, out.getType());
+ }
+
+ @Test
+ public void testType() throws IOException {
+ // default output stream type is TEXT
+ out.write("Text\n");
+ assertEquals(InterpreterResult.Type.TEXT, out.getType());
+ assertEquals("Text\n", new String(out.toByteArray()));
+ assertEquals(1, numAppendEvent);
+ assertEquals(1, numUpdateEvent);
+
+ // change type
+ out.write("%html\n");
+ assertEquals(InterpreterResult.Type.HTML, out.getType());
+ assertEquals("", new String(out.toByteArray()));
+ assertEquals(1, numAppendEvent);
+ assertEquals(2, numUpdateEvent);
+
+ // none TEXT type output stream does not generate append event
+ out.write("<div>html</div>\n");
+ assertEquals(InterpreterResult.Type.HTML, out.getType());
+ assertEquals(1, numAppendEvent);
+ assertEquals(2, numUpdateEvent);
+ assertEquals("<div>html</div>\n", new String(out.toByteArray()));
+
+ // change type to text again
+ out.write("%text hello\n");
+ assertEquals(InterpreterResult.Type.TEXT, out.getType());
+ assertEquals(2, numAppendEvent);
+ assertEquals(3, numUpdateEvent);
+ assertEquals("hello\n", new String(out.toByteArray()));
+ }
+
+ @Test
+ public void testType2() throws IOException {
+ out.write("%html\nHello");
+ assertEquals(InterpreterResult.Type.HTML, out.getType());
+ }
+
+ @Override
+ public void onAppend(InterpreterOutput out, byte[] line) {
+ numAppendEvent++;
+ }
+
+ @Override
+ public void onUpdate(InterpreterOutput out, byte[] output) {
+ numUpdateEvent++;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
index 007730a..d7ab9e8 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterResultTest.java
@@ -105,4 +105,9 @@ public class InterpreterResultTest {
"123\n", result.message());
}
+ @Test
+ public void testToString() {
+ assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/5ec59a81/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
index 29a1fb1..906878d 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -64,12 +64,13 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
Properties p = new Properties();
intp = new RemoteInterpreter(
- p,
- MockInterpreterAngular.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env,
- 10 * 1000
+ p,
+ MockInterpreterAngular.class.getName(),
+ new File("../bin/interpreter.sh").getAbsolutePath(),
+ "fake",
+ env,
+ 10 * 1000,
+ null
);
intpGroup.add(intp);
@@ -83,7 +84,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
+ new LinkedList<InterpreterContextRunner>(), null);
intp.open();
}