You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 06:49:56 UTC
[03/11] zeppelin git commit: Revert "[ZEPPELIN-2627] Interpreter
refactor"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
new file mode 100644
index 0000000..1aab757
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
@@ -0,0 +1,327 @@
+package org.apache.zeppelin.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.Test;
+
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class InterpreterSettingTest {
+
+ @Test
+ public void sharedModeCloseandRemoveInterpreterGroupTest() {
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setPerUser(InterpreterOption.SHARED);
+ InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+ interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+ @Override
+ public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+ InterpreterOption option) {
+ return new InterpreterGroup(interpreterGroupId);
+ }
+ });
+
+ Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList1 = new ArrayList<>();
+ interpreterList1.add(mockInterpreter1);
+ InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+ // This won't effect anything
+ Interpreter mockInterpreter2 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList2 = new ArrayList<>();
+ interpreterList2.add(mockInterpreter2);
+ interpreterGroup = interpreterSetting.getInterpreterGroup("user2", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user2", "note1"), interpreterList2);
+
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1", "user2");
+ assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+ }
+
+ @Test
+ public void perUserScopedModeCloseAndRemoveInterpreterGroupTest() {
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setPerUser(InterpreterOption.SCOPED);
+ InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+ interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+ @Override
+ public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+ InterpreterOption option) {
+ return new InterpreterGroup(interpreterGroupId);
+ }
+ });
+
+ Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList1 = new ArrayList<>();
+ interpreterList1.add(mockInterpreter1);
+ InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+ Interpreter mockInterpreter2 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList2 = new ArrayList<>();
+ interpreterList2.add(mockInterpreter2);
+ interpreterGroup = interpreterSetting.getInterpreterGroup("user2", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user2", "note1"), interpreterList2);
+
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+ assertEquals(2, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+ assertEquals(2, interpreterSetting.getInterpreterGroup("user2", "note1").size());
+
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1", "user1");
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user2","note1").size());
+
+ // Check if non-existed key works or not
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1", "user1");
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user2","note1").size());
+
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1", "user2");
+ assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+ }
+
+ @Test
+ public void perUserIsolatedModeCloseAndRemoveInterpreterGroupTest() {
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setPerUser(InterpreterOption.ISOLATED);
+ InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+ interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+ @Override
+ public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+ InterpreterOption option) {
+ return new InterpreterGroup(interpreterGroupId);
+ }
+ });
+
+ Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList1 = new ArrayList<>();
+ interpreterList1.add(mockInterpreter1);
+ InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+ Interpreter mockInterpreter2 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList2 = new ArrayList<>();
+ interpreterList2.add(mockInterpreter2);
+ interpreterGroup = interpreterSetting.getInterpreterGroup("user2", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user2", "note1"), interpreterList2);
+
+ assertEquals(2, interpreterSetting.getAllInterpreterGroups().size());
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user2", "note1").size());
+
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1", "user1");
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user2","note1").size());
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1", "user2");
+ assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+ }
+
+ @Test
+ public void perNoteScopedModeCloseAndRemoveInterpreterGroupTest() {
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setPerNote(InterpreterOption.SCOPED);
+ InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+ interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+ @Override
+ public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+ InterpreterOption option) {
+ return new InterpreterGroup(interpreterGroupId);
+ }
+ });
+
+ Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList1 = new ArrayList<>();
+ interpreterList1.add(mockInterpreter1);
+ InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+ Interpreter mockInterpreter2 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList2 = new ArrayList<>();
+ interpreterList2.add(mockInterpreter2);
+ interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note2");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note2"), interpreterList2);
+
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+ assertEquals(2, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+ assertEquals(2, interpreterSetting.getInterpreterGroup("user1", "note2").size());
+
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1", "user1");
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1","note2").size());
+
+ // Check if non-existed key works or not
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1", "user1");
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1","note2").size());
+
+ interpreterSetting.closeAndRemoveInterpreterGroup("note2", "user1");
+ assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+ }
+
+ @Test
+ public void perNoteIsolatedModeCloseAndRemoveInterpreterGroupTest() {
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setPerNote(InterpreterOption.ISOLATED);
+ InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+ interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+ @Override
+ public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+ InterpreterOption option) {
+ return new InterpreterGroup(interpreterGroupId);
+ }
+ });
+
+ Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList1 = new ArrayList<>();
+ interpreterList1.add(mockInterpreter1);
+ InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+ Interpreter mockInterpreter2 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList2 = new ArrayList<>();
+ interpreterList2.add(mockInterpreter2);
+ interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note2");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note2"), interpreterList2);
+
+ assertEquals(2, interpreterSetting.getAllInterpreterGroups().size());
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note2").size());
+
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1", "user1");
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1","note2").size());
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+
+ interpreterSetting.closeAndRemoveInterpreterGroup("note2", "user1");
+ assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+ }
+
+ @Test
+ public void perNoteScopedModeRemoveInterpreterGroupWhenNoteIsRemoved() {
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setPerNote(InterpreterOption.SCOPED);
+ InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+ interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+ @Override
+ public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+ InterpreterOption option) {
+ return new InterpreterGroup(interpreterGroupId);
+ }
+ });
+
+ Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList1 = new ArrayList<>();
+ interpreterList1.add(mockInterpreter1);
+ InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+
+ // This method will be called when remove note
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1","");
+ assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+ // Be careful that getInterpreterGroup makes interpreterGroup if it doesn't exist
+ assertEquals(0, interpreterSetting.getInterpreterGroup("user1","note1").size());
+ }
+
+ @Test
+ public void perNoteIsolatedModeRemoveInterpreterGroupWhenNoteIsRemoved() {
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setPerNote(InterpreterOption.ISOLATED);
+ InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+ interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+ @Override
+ public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+ InterpreterOption option) {
+ return new InterpreterGroup(interpreterGroupId);
+ }
+ });
+
+ Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList1 = new ArrayList<>();
+ interpreterList1.add(mockInterpreter1);
+ InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+
+ // This method will be called when remove note
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1","");
+ assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+ // Be careful that getInterpreterGroup makes interpreterGroup if it doesn't exist
+ assertEquals(0, interpreterSetting.getInterpreterGroup("user1","note1").size());
+ }
+
+ @Test
+ public void perUserScopedModeNeverRemoveInterpreterGroupWhenNoteIsRemoved() {
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setPerUser(InterpreterOption.SCOPED);
+ InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+ interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+ @Override
+ public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+ InterpreterOption option) {
+ return new InterpreterGroup(interpreterGroupId);
+ }
+ });
+
+ Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList1 = new ArrayList<>();
+ interpreterList1.add(mockInterpreter1);
+ InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+
+ // This method will be called when remove note
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1","");
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+ // Be careful that getInterpreterGroup makes interpreterGroup if it doesn't exist
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1","note1").size());
+ }
+
+ @Test
+ public void perUserIsolatedModeNeverRemoveInterpreterGroupWhenNoteIsRemoved() {
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setPerUser(InterpreterOption.ISOLATED);
+ InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+ interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+ @Override
+ public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+ InterpreterOption option) {
+ return new InterpreterGroup(interpreterGroupId);
+ }
+ });
+
+ Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+ List<Interpreter> interpreterList1 = new ArrayList<>();
+ interpreterList1.add(mockInterpreter1);
+ InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+ interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+
+ // This method will be called when remove note
+ interpreterSetting.closeAndRemoveInterpreterGroup("note1","");
+ assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+ // Be careful that getInterpreterGroup makes interpreterGroup if it doesn't exist
+ assertEquals(1, interpreterSetting.getInterpreterGroup("user1","note1").size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/install/InstallInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/install/InstallInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/install/InstallInterpreterTest.java
new file mode 100644
index 0000000..e934f1a
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/install/InstallInterpreterTest.java
@@ -0,0 +1,86 @@
+package org.apache.zeppelin.interpreter.install;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class InstallInterpreterTest {
+ private File tmpDir;
+ private InstallInterpreter installer;
+ private File interpreterBaseDir;
+
+ @Before
+ public void setUp() throws IOException {
+ tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+ new File(tmpDir, "conf").mkdirs();
+ interpreterBaseDir = new File(tmpDir, "interpreter");
+ File localRepoDir = new File(tmpDir, "local-repo");
+ interpreterBaseDir.mkdir();
+ localRepoDir.mkdir();
+
+ File interpreterListFile = new File(tmpDir, "conf/interpreter-list");
+
+
+ // create interpreter list file
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
+
+ String interpreterList = "";
+ interpreterList += "intp1 org.apache.commons:commons-csv:1.1 test interpreter 1\n";
+ interpreterList += "intp2 org.apache.commons:commons-math3:3.6.1 test interpreter 2\n";
+
+ FileUtils.writeStringToFile(new File(tmpDir, "conf/interpreter-list"), interpreterList);
+
+ installer = new InstallInterpreter(interpreterListFile, interpreterBaseDir, localRepoDir
+ .getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+
+
+ @Test
+ public void testList() {
+ assertEquals(2, installer.list().size());
+ }
+
+ @Test
+ public void install() {
+ assertEquals(0, interpreterBaseDir.listFiles().length);
+
+ installer.install("intp1");
+ assertTrue(new File(interpreterBaseDir, "intp1").isDirectory());
+ }
+
+ @Test
+ public void installAll() {
+ installer.installAll();
+ assertTrue(new File(interpreterBaseDir, "intp1").isDirectory());
+ assertTrue(new File(interpreterBaseDir, "intp2").isDirectory());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
new file mode 100644
index 0000000..b16e937
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
@@ -0,0 +1,105 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.zeppelin.interpreter.mock;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+public class MockInterpreter1 extends Interpreter{
+Map<String, Object> vars = new HashMap<>();
+
+ public MockInterpreter1(Properties property) {
+ super(property);
+ }
+ boolean open;
+
+
+ @Override
+ public void open() {
+ open = true;
+ }
+
+ @Override
+ public void close() {
+ open = false;
+ }
+
+
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ InterpreterResult result;
+
+ if ("getId".equals(st)) {
+ // get unique id of this interpreter instance
+ result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
+ } else if (st.startsWith("sleep")) {
+ try {
+ Thread.sleep(Integer.parseInt(st.split(" ")[1]));
+ } catch (InterruptedException e) {
+ // nothing to do
+ }
+ result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
+ } else {
+ result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
+ }
+
+ if (context.getResourcePool() != null) {
+ context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result);
+ }
+
+ return result;
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode());
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter11.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter11.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter11.java
new file mode 100644
index 0000000..5b9e802
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter11.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.mock;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+public class MockInterpreter11 extends Interpreter{
+ Map<String, Object> vars = new HashMap<>();
+
+ public MockInterpreter11(Properties property) {
+ super(property);
+ }
+ boolean open;
+
+ @Override
+ public void open() {
+ open = true;
+ }
+
+ @Override
+ public void close() {
+ open = false;
+ }
+
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl11: "+st);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode());
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
new file mode 100644
index 0000000..7a52f7d
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.mock;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+public class MockInterpreter2 extends Interpreter{
+ Map<String, Object> vars = new HashMap<>();
+
+ public MockInterpreter2(Properties property) {
+ super(property);
+ }
+
+ boolean open;
+
+ @Override
+ public void open() {
+ open = true;
+ }
+
+ @Override
+ public void close() {
+ open = false;
+ }
+
+ public boolean isOpen() {
+ return open;
+ }
+
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ InterpreterResult result;
+
+ if ("getId".equals(st)) {
+ // get unique id of this interpreter instance
+ result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
+ } else if (st.startsWith("sleep")) {
+ try {
+ Thread.sleep(Integer.parseInt(st.split(" ")[1]));
+ } catch (InterruptedException e) {
+ // nothing to do
+ }
+ result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
+ } else {
+ result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
+ }
+
+ if (context.getResourcePool() != null) {
+ context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result);
+ }
+ return result;
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode());
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
new file mode 100644
index 0000000..c8c64ea
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class AppendOutputRunnerTest {
+
+ private static final int NUM_EVENTS = 10000;
+ private static final int NUM_CLUBBED_EVENTS = 100;
+ private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+ private static ScheduledFuture<?> future = null;
+ /* It is being accessed by multiple threads.
+ * While loop for 'loopForBufferCompletion' could
+ * run for-ever.
+ */
+ private volatile static int numInvocations = 0;
+
+ @After
+ public void afterEach() {
+ if (future != null) {
+ future.cancel(true);
+ }
+ }
+
+ @Test
+ public void testSingleEvent() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ String[][] buffer = {{"note", "para", "data\n"}};
+
+ loopForCompletingEvents(listener, 1, buffer);
+ verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n");
+ }
+
+ @Test
+ public void testMultipleEventsOfSameParagraph() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ String note1 = "note1";
+ String para1 = "para1";
+ String[][] buffer = {
+ {note1, para1, "data1\n"},
+ {note1, para1, "data2\n"},
+ {note1, para1, "data3\n"}
+ };
+
+ loopForCompletingEvents(listener, 1, buffer);
+ verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
+ }
+
+ @Test
+ public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ String note1 = "note1";
+ String note2 = "note2";
+ String para1 = "para1";
+ String para2 = "para2";
+ String[][] buffer = {
+ {note1, para1, "data1\n"},
+ {note1, para2, "data2\n"},
+ {note2, para1, "data3\n"},
+ {note2, para2, "data4\n"}
+ };
+ loopForCompletingEvents(listener, 4, buffer);
+
+ verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n");
+ verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n");
+ verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n");
+ verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n");
+ }
+
+ @Test
+ public void testClubbedData() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ future = service.scheduleWithFixedDelay(runner, 0,
+ AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+ Thread thread = new Thread(new BombardEvents(runner));
+ thread.start();
+ thread.join();
+ Thread.sleep(1000);
+
+ /* NUM_CLUBBED_EVENTS is a heuristic number.
+ * It has been observed that for 10,000 continuos event
+ * calls, 30-40 Web-socket calls are made. Keeping
+ * the unit-test to a pessimistic 100 web-socket calls.
+ */
+ verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ }
+
+ @Test
+ public void testWarnLoggerForLargeData() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ String data = "data\n";
+ int numEvents = 100000;
+
+ for (int i=0; i<numEvents; i++) {
+ runner.appendBuffer("noteId", "paraId", 0, data);
+ }
+
+ TestAppender appender = new TestAppender();
+ Logger logger = Logger.getRootLogger();
+ logger.addAppender(appender);
+ Logger.getLogger(RemoteInterpreterEventPoller.class);
+
+ runner.run();
+ List<LoggingEvent> log;
+
+ int warnLogCounter;
+ LoggingEvent sizeWarnLogEntry = null;
+ do {
+ warnLogCounter = 0;
+ log = appender.getLog();
+ for (LoggingEvent logEntry: log) {
+ if (Level.WARN.equals(logEntry.getLevel())) {
+ sizeWarnLogEntry = logEntry;
+ warnLogCounter += 1;
+ }
+ }
+ } while(warnLogCounter != 2);
+
+ String loggerString = "Processing size for buffered append-output is high: " +
+ (data.length() * numEvents) + " characters.";
+ assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
+ }
+
+ private class BombardEvents implements Runnable {
+
+ private final AppendOutputRunner runner;
+
+ private BombardEvents(AppendOutputRunner runner) {
+ this.runner = runner;
+ }
+
+ @Override
+ public void run() {
+ String noteId = "noteId";
+ String paraId = "paraId";
+ for (int i=0; i<NUM_EVENTS; i++) {
+ runner.appendBuffer(noteId, paraId, 0, "data\n");
+ }
+ }
+ }
+
+ private class TestAppender extends AppenderSkeleton {
+ private final List<LoggingEvent> log = new ArrayList<>();
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ protected void append(final LoggingEvent loggingEvent) {
+ log.add(loggingEvent);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public List<LoggingEvent> getLog() {
+ return new ArrayList<>(log);
+ }
+ }
+
+ private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ numInvocations += 1;
+ return null;
+ }
+ }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ }
+
+ private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
+ int numTimes, String[][] buffer) {
+ numInvocations = 0;
+ prepareInvocationCounts(listener);
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ for (String[] bufferElement: buffer) {
+ runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
+ }
+ future = service.scheduleWithFixedDelay(runner, 0,
+ AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+ long startTimeMs = System.currentTimeMillis();
+ while(numInvocations != numTimes) {
+ if (System.currentTimeMillis() - startTimeMs > 2000) {
+ fail("Buffered events were not sent for 2 seconds");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
new file mode 100644
index 0000000..f7404e3
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.display.*;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+
+ private InterpreterGroup intpGroup;
+ private HashMap<String, String> env;
+ private RemoteInterpreter intp;
+ private InterpreterContext context;
+ private RemoteAngularObjectRegistry localRegistry;
+
+ private AtomicInteger onAdd;
+ private AtomicInteger onUpdate;
+ private AtomicInteger onRemove;
+
+ @Before
+ public void setUp() throws Exception {
+ onAdd = new AtomicInteger(0);
+ onUpdate = new AtomicInteger(0);
+ onRemove = new AtomicInteger(0);
+
+ intpGroup = new InterpreterGroup("intpId");
+ localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup);
+ intpGroup.setAngularObjectRegistry(localRegistry);
+ env = new HashMap<>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+ Properties p = new Properties();
+
+ intp = new RemoteInterpreter(
+ p,
+ "note",
+ MockInterpreterAngular.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ null,
+ null,
+ "anonymous",
+ false
+ );
+
+ intpGroup.put("note", new LinkedList<Interpreter>());
+ intpGroup.get("note").add(intp);
+ intp.setInterpreterGroup(intpGroup);
+
+ context = new InterpreterContext(
+ "note",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null);
+
+ intp.open();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ intp.close();
+ intpGroup.close();
+ }
+
+ @Test
+ public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+ assertEquals("0", result[1]); // num watcher called
+
+ // create object
+ ret = intp.interpret("add n1 v1", context);
+ Thread.sleep(500);
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("0", result[1]); // num watcher called
+ assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+ // update object
+ ret = intp.interpret("update n1 v11", context);
+ result = ret.message().get(0).getData().split(" ");
+ Thread.sleep(500);
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("1", result[1]); // num watcher called
+ assertEquals("v11", localRegistry.get("n1", "note", null).get());
+
+ // remove object
+ ret = intp.interpret("remove n1", context);
+ result = ret.message().get(0).getData().split(" ");
+ Thread.sleep(500);
+ assertEquals("0", result[0]); // size of registry
+ assertEquals("1", result[1]); // num watcher called
+ assertEquals(null, localRegistry.get("n1", "note", null));
+ }
+
+ @Test
+ public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
+ // test if angularobject removal from server side propagate to interpreter process's registry.
+ // will happen when notebook is removed.
+
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+
+ // create object
+ ret = intp.interpret("add n1 v1", context);
+ Thread.sleep(500);
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+ // remove object in local registry.
+ localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
+ ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+ }
+
+ @Test
+ public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
+ // test if angularobject add from server side propagate to interpreter process's registry.
+ // will happen when zeppelin server loads notebook and restore the object into registry
+
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+
+ // create object
+ localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
+
+ // get from remote registry
+ ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ }
+
+ @Override
+ public void onAdd(String interpreterGroupId, AngularObject object) {
+ onAdd.incrementAndGet();
+ }
+
+ @Override
+ public void onUpdate(String interpreterGroupId, AngularObject object) {
+ onUpdate.incrementAndGet();
+ }
+
+ @Override
+ public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
+ onRemove.incrementAndGet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
new file mode 100644
index 0000000..49aa7aa
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.junit.Test;
+
+import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteInterpreterEventPollerTest {
+
+ @Test
+ public void shouldClearUnreadEventsOnShutdown() throws Exception {
+ RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
+ RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
+
+ eventPoller.setInterpreterProcess(interpreterProc);
+ eventPoller.shutdown();
+ eventPoller.start();
+ eventPoller.join();
+
+ assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
+ }
+
+ private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
+ RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
+ RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
+ RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
+ RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
+
+ when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
+ when(intProc.getClient()).thenReturn(client);
+
+ return intProc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
new file mode 100644
index 0000000..3f865cb
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test for remote interpreter output stream
+ */
+public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+ private InterpreterGroup intpGroup;
+ private HashMap<String, String> env;
+
+ @Before
+ public void setUp() throws Exception {
+ intpGroup = new InterpreterGroup();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ env = new HashMap<>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ intpGroup.close();
+ }
+
+ private RemoteInterpreter createMockInterpreter() {
+ RemoteInterpreter intp = new RemoteInterpreter(
+ new Properties(),
+ "note",
+ MockInterpreterOutputStream.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ this,
+ null,
+ "anonymous",
+ false);
+
+ intpGroup.get("note").add(intp);
+ intp.setInterpreterGroup(intpGroup);
+ return intp;
+ }
+
+ private InterpreterContext createInterpreterContext() {
+ return new InterpreterContext(
+ "noteId",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ null,
+ new LinkedList<InterpreterContextRunner>(), null);
+ }
+
+ @Test
+ public void testInterpreterResultOnly() {
+ RemoteInterpreter intp = createMockInterpreter();
+ InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("staticresult", ret.message().get(0).getData());
+
+ ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("staticresult2", ret.message().get(0).getData());
+
+ ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals("staticresult3", ret.message().get(0).getData());
+ }
+
+ @Test
+ public void testInterpreterOutputStreamOnly() {
+ RemoteInterpreter intp = createMockInterpreter();
+ InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("streamresult", ret.message().get(0).getData());
+
+ ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals("streamresult2", ret.message().get(0).getData());
+ }
+
+ @Test
+ public void testInterpreterResultOutputStreamMixed() {
+ RemoteInterpreter intp = createMockInterpreter();
+ InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("stream", ret.message().get(0).getData());
+ assertEquals("static", ret.message().get(1).getData());
+ }
+
+ @Test
+ public void testOutputType() {
+ RemoteInterpreter intp = createMockInterpreter();
+
+ InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+ assertEquals("hello", ret.message().get(0).getData());
+
+ ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+ assertEquals("hello", ret.message().get(0).getData());
+
+ ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+ assertEquals("hello", ret.message().get(0).getData());
+ assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType());
+ assertEquals("world", ret.message().get(1).getData());
+ }
+
+ @Override
+ public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
+
+ }
+
+ @Override
+ public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
+
+ }
+
+ @Override
+ public void onOutputClear(String noteId, String paragraphId) {
+
+ }
+
+ @Override
+ public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
+
+ }
+
+ @Override
+ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
+ if (callback != null) {
+ callback.onFinished(new LinkedList<>());
+ }
+ }
+
+ @Override
+ public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
+
+ }
+
+ @Override
+ public void onParaInfosReceived(String noteId, String paragraphId,
+ String interpreterSettingId, Map<String, String> metaInfos) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
new file mode 100644
index 0000000..b85d7ef
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.interpreter.Constants;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.junit.Test;
+
+public class RemoteInterpreterProcessTest {
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+ private static final int DUMMY_PORT=3678;
+
+ @Test
+ public void testStartStop() {
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+ INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+ 10 * 1000, null, null,"fakeName");
+ assertFalse(rip.isRunning());
+ assertEquals(0, rip.referenceCount());
+ assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+ assertEquals(2, rip.reference(intpGroup, "anonymous", false));
+ assertEquals(true, rip.isRunning());
+ assertEquals(1, rip.dereference());
+ assertEquals(true, rip.isRunning());
+ assertEquals(0, rip.dereference());
+ assertEquals(false, rip.isRunning());
+ }
+
+ @Test
+ public void testClientFactory() throws Exception {
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+ INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+ mock(RemoteInterpreterEventPoller.class), 10 * 1000, "fakeName");
+ rip.reference(intpGroup, "anonymous", false);
+ assertEquals(0, rip.getNumActiveClient());
+ assertEquals(0, rip.getNumIdleClient());
+
+ Client client = rip.getClient();
+ assertEquals(1, rip.getNumActiveClient());
+ assertEquals(0, rip.getNumIdleClient());
+
+ rip.releaseClient(client);
+ assertEquals(0, rip.getNumActiveClient());
+ assertEquals(1, rip.getNumIdleClient());
+
+ rip.dereference();
+ }
+
+ @Test
+ public void testStartStopRemoteInterpreter() throws TException, InterruptedException {
+ RemoteInterpreterServer server = new RemoteInterpreterServer(3678);
+ server.start();
+ boolean running = false;
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < 10 * 1000) {
+ if (server.isRunning()) {
+ running = true;
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ }
+ Properties properties = new Properties();
+ properties.setProperty(Constants.ZEPPELIN_INTERPRETER_PORT, "3678");
+ properties.setProperty(Constants.ZEPPELIN_INTERPRETER_HOST, "localhost");
+ InterpreterGroup intpGroup = mock(InterpreterGroup.class);
+ when(intpGroup.getProperty()).thenReturn(properties);
+ when(intpGroup.containsKey(Constants.EXISTING_PROCESS)).thenReturn(true);
+
+ RemoteInterpreterProcess rip = new RemoteInterpreterManagedProcess(
+ INTERPRETER_SCRIPT,
+ "nonexists",
+ "fakeRepo",
+ new HashMap<String, String>(),
+ mock(RemoteInterpreterEventPoller.class)
+ , 10 * 1000,
+ "fakeName");
+ assertFalse(rip.isRunning());
+ assertEquals(0, rip.referenceCount());
+ assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+ assertEquals(true, rip.isRunning());
+ }
+
+
+ @Test
+ public void testPropagateError() throws TException, InterruptedException {
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+ "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
+ 10 * 1000, null, null, "fakeName");
+ assertFalse(rip.isRunning());
+ assertEquals(0, rip.referenceCount());
+ try {
+ assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+ } catch (InterpreterException e) {
+ e.getMessage().contains("hello_world");
+ }
+ assertEquals(0, rip.referenceCount());
+ }
+}