You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 01:14:24 UTC
[07/11] zeppelin git commit: [ZEPPELIN-2627] Interpreter refactor
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
new file mode 100644
index 0000000..61e4ef0
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+
+ private RemoteInterpreter intp;
+ private InterpreterContext context;
+ private RemoteAngularObjectRegistry localRegistry;
+ private InterpreterSetting interpreterSetting;
+
+ private AtomicInteger onAdd;
+ private AtomicInteger onUpdate;
+ private AtomicInteger onRemove;
+
+ @Before
+ public void setUp() throws Exception {
+ onAdd = new AtomicInteger(0);
+ onUpdate = new AtomicInteger(0);
+ onRemove = new AtomicInteger(0);
+
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setRemote(true);
+ InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterAngular.class.getName(), "mock", true, new HashMap<String, Object>());
+ List<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ interpreterInfos.add(interpreterInfo1);
+ InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT);
+ interpreterSetting = new InterpreterSetting.Builder()
+ .setId("test")
+ .setName("test")
+ .setGroup("test")
+ .setInterpreterInfos(interpreterInfos)
+ .setOption(interpreterOption)
+ .setRunner(runner)
+ .setInterpreterDir("../interpeters/test")
+ .create();
+
+ intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
+ localRegistry = (RemoteAngularObjectRegistry) intp.getInterpreterGroup().getAngularObjectRegistry();
+
+ context = new InterpreterContext(
+ "note",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intp.getInterpreterGroup().getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null);
+
+ intp.open();
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ interpreterSetting.close();
+ }
+
+ @Test
+ public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+ assertEquals("0", result[1]); // num watcher called
+
+ // create object
+ ret = intp.interpret("add n1 v1", context);
+ Thread.sleep(500);
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("0", result[1]); // num watcher called
+ assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+ // update object
+ ret = intp.interpret("update n1 v11", context);
+ result = ret.message().get(0).getData().split(" ");
+ Thread.sleep(500);
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("1", result[1]); // num watcher called
+ assertEquals("v11", localRegistry.get("n1", "note", null).get());
+
+ // remove object
+ ret = intp.interpret("remove n1", context);
+ result = ret.message().get(0).getData().split(" ");
+ Thread.sleep(500);
+ assertEquals("0", result[0]); // size of registry
+ assertEquals("1", result[1]); // num watcher called
+ assertEquals(null, localRegistry.get("n1", "note", null));
+ }
+
+ @Test
+ public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
+ // test if angularobject removal from server side propagate to interpreter process's registry.
+ // will happen when notebook is removed.
+
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+
+ // create object
+ ret = intp.interpret("add n1 v1", context);
+ Thread.sleep(500);
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+ // remove object in local registry.
+ localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
+ ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+ }
+
+ @Test
+ public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
+ // test if angularobject add from server side propagate to interpreter process's registry.
+ // will happen when zeppelin server loads notebook and restore the object into registry
+
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+
+ // create object
+ localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
+
+ // get from remote registry
+ ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ }
+
+ @Override
+ public void onAdd(String interpreterGroupId, AngularObject object) {
+ onAdd.incrementAndGet();
+ }
+
+ @Override
+ public void onUpdate(String interpreterGroupId, AngularObject object) {
+ onUpdate.incrementAndGet();
+ }
+
+ @Override
+ public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
+ onRemove.incrementAndGet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
new file mode 100644
index 0000000..49aa7aa
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.junit.Test;
+
+import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteInterpreterEventPollerTest {
+
+ @Test
+ public void shouldClearUnreadEventsOnShutdown() throws Exception {
+ RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
+ RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
+
+ eventPoller.setInterpreterProcess(interpreterProc);
+ eventPoller.shutdown();
+ eventPoller.start();
+ eventPoller.join();
+
+ assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
+ }
+
+ private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
+ RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
+ RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
+ RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
+ RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
+
+ when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
+ when(intProc.getClient()).thenReturn(client);
+
+ return intProc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
new file mode 100644
index 0000000..1687060
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * Test for remote interpreter output stream
+ */
+public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+
+ private InterpreterSetting interpreterSetting;
+
+ @Before
+ public void setUp() throws Exception {
+ InterpreterOption interpreterOption = new InterpreterOption();
+
+ interpreterOption.setRemote(true);
+ InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterOutputStream.class.getName(), "mock", true, new HashMap<String, Object>());
+ List<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ interpreterInfos.add(interpreterInfo1);
+ InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT);
+ interpreterSetting = new InterpreterSetting.Builder()
+ .setId("test")
+ .setName("test")
+ .setGroup("test")
+ .setInterpreterInfos(interpreterInfos)
+ .setOption(interpreterOption)
+ .setRunner(runner)
+ .setInterpreterDir("../interpeters/test")
+ .create();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ interpreterSetting.close();
+ }
+
+ private InterpreterContext createInterpreterContext() {
+ return new InterpreterContext(
+ "noteId",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ null,
+ null,
+ new LinkedList<InterpreterContextRunner>(), null);
+ }
+
+ @Test
+ public void testInterpreterResultOnly() {
+ RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
+ InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("staticresult", ret.message().get(0).getData());
+
+ ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("staticresult2", ret.message().get(0).getData());
+
+ ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals("staticresult3", ret.message().get(0).getData());
+ }
+
+ @Test
+ public void testInterpreterOutputStreamOnly() {
+ RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
+ InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("streamresult", ret.message().get(0).getData());
+
+ ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals("streamresult2", ret.message().get(0).getData());
+ }
+
+ @Test
+ public void testInterpreterResultOutputStreamMixed() {
+ RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
+ InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("stream", ret.message().get(0).getData());
+ assertEquals("static", ret.message().get(1).getData());
+ }
+
+ @Test
+ public void testOutputType() {
+ RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
+
+ InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+ assertEquals("hello", ret.message().get(0).getData());
+
+ ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+ assertEquals("hello", ret.message().get(0).getData());
+
+ ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+ assertEquals("hello", ret.message().get(0).getData());
+ assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType());
+ assertEquals("world", ret.message().get(1).getData());
+ }
+
+ @Override
+ public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
+
+ }
+
+ @Override
+ public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
+
+ }
+
+ @Override
+ public void onOutputClear(String noteId, String paragraphId) {
+
+ }
+
+ @Override
+ public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
+
+ }
+
+ @Override
+ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
+ if (callback != null) {
+ callback.onFinished(new LinkedList<>());
+ }
+ }
+
+ @Override
+ public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
+
+ }
+
+ @Override
+ public void onParaInfosReceived(String noteId, String paragraphId,
+ String interpreterSettingId, Map<String, String> metaInfos) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
new file mode 100644
index 0000000..ae98dc3
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.remote.mock.GetEnvPropertyInterpreter;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+public class RemoteInterpreterTest {
+
+
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+
+ private InterpreterSetting interpreterSetting;
+
+ @Before
+ public void setUp() throws Exception {
+ InterpreterOption interpreterOption = new InterpreterOption();
+
+ interpreterOption.setRemote(true);
+ InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>());
+ InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>());
+ InterpreterInfo interpreterInfo3 = new InterpreterInfo(SleepInterpreter.class.getName(), "sleep", false, new HashMap<String, Object>());
+ InterpreterInfo interpreterInfo4 = new InterpreterInfo(GetEnvPropertyInterpreter.class.getName(), "get", false, new HashMap<String, Object>());
+ List<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ interpreterInfos.add(interpreterInfo1);
+ interpreterInfos.add(interpreterInfo2);
+ interpreterInfos.add(interpreterInfo3);
+ interpreterInfos.add(interpreterInfo4);
+ InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT);
+ interpreterSetting = new InterpreterSetting.Builder()
+ .setId("test")
+ .setName("test")
+ .setGroup("test")
+ .setInterpreterInfos(interpreterInfos)
+ .setOption(interpreterOption)
+ .setRunner(runner)
+ .setInterpreterDir("../interpeters/test")
+ .create();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ interpreterSetting.close();
+ }
+
+ @Test
+ public void testSharedMode() {
+ interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
+
+ Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
+ Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1");
+ assertTrue(interpreter1 instanceof RemoteInterpreter);
+ RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
+ assertTrue(interpreter2 instanceof RemoteInterpreter);
+ RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
+
+ InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
+ "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
+ null, null, new ArrayList<InterpreterContextRunner>(), null);
+ assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
+ assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType());
+ assertEquals(0, remoteInterpreter1.getProgress(context1));
+ assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess());
+ assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
+
+ assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
+ assertEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(),
+ remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
+
+ // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the
+ // RemoteInterpreterProcess leakage.
+ remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId());
+ assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess());
+ try {
+ assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
+ fail("Should not be able to call interpret after interpreter is closed");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ try {
+ assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
+ fail("Should not be able to call getProgress after RemoterInterpreterProcess is stoped");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testScopedMode() {
+ interpreterSetting.getOption().setPerUser(InterpreterOption.SCOPED);
+
+ Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
+ Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1");
+ assertTrue(interpreter1 instanceof RemoteInterpreter);
+ RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
+ assertTrue(interpreter2 instanceof RemoteInterpreter);
+ RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
+
+ InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
+ "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
+ null, null, new ArrayList<InterpreterContextRunner>(), null);
+ assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
+ assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
+ assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType());
+ assertEquals(0, remoteInterpreter1.getProgress(context1));
+
+ assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess());
+ assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
+
+ assertEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(),
+ remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
+ // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the
+ // RemoteInterpreterProcess leakage.
+ remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId());
+ try {
+ assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
+ fail("Should not be able to call interpret after interpreter is closed");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
+ assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
+ remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId());
+ try {
+ assertEquals("hello", remoteInterpreter2.interpret("hello", context1));
+ fail("Should not be able to call interpret after interpreter is closed");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
+ }
+
+ @Test
+ public void testIsolatedMode() {
+ interpreterSetting.getOption().setPerUser(InterpreterOption.ISOLATED);
+
+ Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
+ Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1");
+ assertTrue(interpreter1 instanceof RemoteInterpreter);
+ RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
+ assertTrue(interpreter2 instanceof RemoteInterpreter);
+ RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
+
+ InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
+ "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
+ null, null, new ArrayList<InterpreterContextRunner>(), null);
+ assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
+ assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
+ assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType());
+ assertEquals(0, remoteInterpreter1.getProgress(context1));
+ assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess());
+ assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
+
+ assertNotEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(),
+ remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
+ // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the
+ // RemoteInterpreterProcess leakage.
+ remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId());
+ assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess());
+ assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
+ try {
+ remoteInterpreter1.interpret("hello", context1);
+ fail("Should not be able to call getProgress after interpreter is closed");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
+ remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId());
+ try {
+ assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
+ fail("Should not be able to call interpret after interpreter is closed");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
+
+ }
+
+// @Test
+// public void testExecuteIncorrectPrecode() throws TTransportException, IOException {
+// interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
+// interpreterSetting.getProperties().setProperty("zeppelin.SleepInterpreter.precode", "fail test");
+//
+// Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
+// InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
+// "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
+// null, null, new ArrayList<InterpreterContextRunner>(), null);
+// assertEquals(Code.ERROR, interpreter1.interpret("10", context1).code());
+// }
+//
+// @Test
+// public void testExecuteCorrectPrecode() throws TTransportException, IOException {
+// interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
+// interpreterSetting.getProperties().setProperty("zeppelin.SleepInterpreter.precode", "1");
+//
+// Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
+// InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
+// "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
+// null, null, new ArrayList<InterpreterContextRunner>(), null);
+// assertEquals(Code.SUCCESS, interpreter1.interpret("10", context1).code());
+// }
+
+ @Test
+ public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
+ interpreterSetting.setProperty("zeppelin.interpreter.echo.fail", "true");
+ interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
+
+ Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
+ assertTrue(interpreter1 instanceof RemoteInterpreter);
+ RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
+
+ InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
+ "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
+ null, null, new ArrayList<InterpreterContextRunner>(), null);
+ assertEquals(Code.ERROR, remoteInterpreter1.interpret("hello", context1).code());
+ }
+
+ @Test
+ public void testFIFOScheduler() throws InterruptedException {
+ interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
+ // by default SleepInterpreter would use FIFOScheduler
+
+ final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
+ final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
+ "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
+ null, null, new ArrayList<InterpreterContextRunner>(), null);
+ // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
+ // time overhead of launching the process.
+ interpreter1.interpret("1", context1);
+ Thread thread1 = new Thread() {
+ @Override
+ public void run() {
+ assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
+ }
+ };
+ Thread thread2 = new Thread() {
+ @Override
+ public void run() {
+ assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
+ }
+ };
+ long start = System.currentTimeMillis();
+ thread1.start();
+ thread2.start();
+ thread1.join();
+ thread2.join();
+ long end = System.currentTimeMillis();
+ assertTrue((end - start) >= 200);
+ }
+
+ @Test
+ public void testParallelScheduler() throws InterruptedException {
+ interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
+ interpreterSetting.setProperty("zeppelin.SleepInterpreter.parallel", "true");
+
+ final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
+ final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
+ "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
+ null, null, new ArrayList<InterpreterContextRunner>(), null);
+
+ // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
+ // time overhead of launching the process.
+ interpreter1.interpret("1", context1);
+ Thread thread1 = new Thread() {
+ @Override
+ public void run() {
+ assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
+ }
+ };
+ Thread thread2 = new Thread() {
+ @Override
+ public void run() {
+ assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
+ }
+ };
+ long start = System.currentTimeMillis();
+ thread1.start();
+ thread2.start();
+ thread1.join();
+ thread2.join();
+ long end = System.currentTimeMillis();
+ assertTrue((end - start) <= 200);
+ }
+
+// @Test
+// public void testRunOrderPreserved() throws InterruptedException {
+// Properties p = new Properties();
+// intpGroup.put("note", new LinkedList<Interpreter>());
+//
+// final RemoteInterpreter intpA = createMockInterpreterA(p);
+//
+// intpGroup.get("note").add(intpA);
+// intpA.setInterpreterGroup(intpGroup);
+//
+// intpA.open();
+//
+// int concurrency = 3;
+// final List<InterpreterResultMessage> results = new LinkedList<>();
+//
+// Scheduler scheduler = intpA.getScheduler();
+// for (int i = 0; i < concurrency; i++) {
+// final String jobId = Integer.toString(i);
+// scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
+// private Object r;
+//
+// @Override
+// public Object getReturn() {
+// return r;
+// }
+//
+// @Override
+// public void setResult(Object results) {
+// this.r = results;
+// }
+//
+// @Override
+// public int progress() {
+// return 0;
+// }
+//
+// @Override
+// public Map<String, Object> info() {
+// return null;
+// }
+//
+// @Override
+// protected Object jobRun() throws Throwable {
+// InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext(
+// "note",
+// jobId,
+// null,
+// "title",
+// "text",
+// new AuthenticationInfo(),
+// new HashMap<String, Object>(),
+// new GUI(),
+// new AngularObjectRegistry(intpGroup.getId(), null),
+// new LocalResourcePool("pool1"),
+// new LinkedList<InterpreterContextRunner>(), null));
+//
+// synchronized (results) {
+// results.addAll(ret.message());
+// results.notify();
+// }
+// return null;
+// }
+//
+// @Override
+// protected boolean jobAbort() {
+// return false;
+// }
+//
+// });
+// }
+//
+// // wait for job finished
+// synchronized (results) {
+// while (results.size() != concurrency) {
+// results.wait(300);
+// }
+// }
+//
+// int i = 0;
+// for (InterpreterResultMessage result : results) {
+// assertEquals(Integer.toString(i++), result.getData());
+// }
+// assertEquals(concurrency, i);
+//
+// intpA.close();
+// }
+
+
+// @Test
+// public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
+// Properties p = new Properties();
+// intpGroup.put("note", new LinkedList<Interpreter>());
+//
+// RemoteInterpreter intpA = createMockInterpreterA(p);
+//
+// intpGroup.get("note").add(intpA);
+// intpA.setInterpreterGroup(intpGroup);
+//
+// RemoteInterpreter intpB = createMockInterpreterB(p);
+//
+// intpGroup.get("note").add(intpB);
+// intpB.setInterpreterGroup(intpGroup);
+//
+// intpA.open();
+// intpB.open();
+//
+// assertEquals(intpA.getScheduler(), intpB.getScheduler());
+// }
+
+// @Test
+// public void testMultiInterpreterSession() {
+// Properties p = new Properties();
+// intpGroup.put("sessionA", new LinkedList<Interpreter>());
+// intpGroup.put("sessionB", new LinkedList<Interpreter>());
+//
+// RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
+// intpGroup.get("sessionA").add(intpAsessionA);
+// intpAsessionA.setInterpreterGroup(intpGroup);
+//
+// RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
+// intpGroup.get("sessionA").add(intpBsessionA);
+// intpBsessionA.setInterpreterGroup(intpGroup);
+//
+// intpAsessionA.open();
+// intpBsessionA.open();
+//
+// assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
+//
+// RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
+// intpGroup.get("sessionB").add(intpAsessionB);
+// intpAsessionB.setInterpreterGroup(intpGroup);
+//
+// RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
+// intpGroup.get("sessionB").add(intpBsessionB);
+// intpBsessionB.setInterpreterGroup(intpGroup);
+//
+// intpAsessionB.open();
+// intpBsessionB.open();
+//
+// assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
+// assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
+// }
+
+// @Test
+// public void should_push_local_angular_repo_to_remote() throws Exception {
+// //Given
+// final Client client = mock(Client.class);
+// final RemoteInterpreter intr = null;
+//// new RemoteInterpreter(new Properties(), "noteId",
+//// MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null,
+//// null, "anonymous", false);
+// final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
+// registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
+// final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
+// interpreterGroup.setAngularObjectRegistry(registry);
+// intr.setInterpreterGroup(interpreterGroup);
+//
+// final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+// Map<String, AngularObject>>>() {
+// }.getType();
+// final Gson gson = new Gson();
+// final String expected = gson.toJson(registry.getRegistry(), registryType);
+//
+// //When
+//// intr.pushAngularObjectRegistryToRemote(client);
+//
+// //Then
+// Mockito.verify(client).angularRegistryPush(expected);
+// }
+
+ @Test
+ public void testEnvStringPattern() {
+ assertFalse(RemoteInterpreterUtils.isEnvString(null));
+ assertFalse(RemoteInterpreterUtils.isEnvString(""));
+ assertFalse(RemoteInterpreterUtils.isEnvString("abcDEF"));
+ assertFalse(RemoteInterpreterUtils.isEnvString("ABC-DEF"));
+ assertTrue(RemoteInterpreterUtils.isEnvString("ABCDEF"));
+ assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF"));
+ assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF123"));
+ }
+
+ @Test
+ public void testEnvironmentAndProperty() {
+ interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
+ interpreterSetting.setProperty("ENV_1", "VALUE_1");
+ interpreterSetting.setProperty("property_1", "value_1");
+
+ final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "get");
+ final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
+ "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
+ null, null, new ArrayList<InterpreterContextRunner>(), null);
+
+ assertEquals("VALUE_1", interpreter1.interpret("getEnv ENV_1", context1).message().get(0).getData());
+ assertEquals("null", interpreter1.interpret("getEnv ENV_2", context1).message().get(0).getData());
+
+ assertEquals("value_1", interpreter1.interpret("getProperty property_1", context1).message().get(0).getData());
+ assertEquals("null", interpreter1.interpret("getProperty property_2", context1).message().get(0).getData());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
new file mode 100644
index 0000000..5f7426a
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+
+public class RemoteInterpreterUtilsTest {
+
+ @Test
+ public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
+ assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java
new file mode 100644
index 0000000..a039a59
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+
+public class GetEnvPropertyInterpreter extends Interpreter {
+
+ public GetEnvPropertyInterpreter(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] cmd = st.split(" ");
+ if (cmd[0].equals("getEnv")) {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1]) == null ? "null" : System.getenv(cmd[1]));
+ } else if (cmd[0].equals("getProperty")){
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1]) == null ? "null" : System.getProperty(cmd[1]));
+ } else {
+ return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]);
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
new file mode 100644
index 0000000..5a3e57c
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+public class MockInterpreterA extends Interpreter {
+
+ private String lastSt;
+
+ public MockInterpreterA(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ //new RuntimeException().printStackTrace();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public String getLastStatement() {
+ return lastSt;
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ if (property.containsKey("progress")) {
+ context.setProgress(Integer.parseInt(getProperty("progress")));
+ }
+ try {
+ Thread.sleep(Long.parseLong(st));
+ this.lastSt = st;
+ } catch (NumberFormatException | InterruptedException e) {
+ throw new InterpreterException(e);
+ }
+ return new InterpreterResult(Code.SUCCESS, st);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ if (getProperty("parallel") != null && getProperty("parallel").equals("true")) {
+ return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10);
+ } else {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
new file mode 100644
index 0000000..ec89241
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockInterpreterAngular extends Interpreter {
+
+ AtomicInteger numWatch = new AtomicInteger(0);
+
+ public MockInterpreterAngular(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] stmt = st.split(" ");
+ String cmd = stmt[0];
+ String name = null;
+ if (stmt.length >= 2) {
+ name = stmt[1];
+ }
+ String value = null;
+ if (stmt.length == 3) {
+ value = stmt[2];
+ }
+
+ AngularObjectRegistry registry = context.getAngularObjectRegistry();
+
+ if (cmd.equals("add")) {
+ registry.add(name, value, context.getNoteId(), null);
+ registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher
+ (null) {
+
+ @Override
+ public void watch(Object oldObject, Object newObject,
+ InterpreterContext context) {
+ numWatch.incrementAndGet();
+ }
+
+ });
+ } else if (cmd.equalsIgnoreCase("update")) {
+ registry.get(name, context.getNoteId(), null).set(value);
+ } else if (cmd.equals("remove")) {
+ registry.remove(name, context.getNoteId(), null);
+ }
+
+ try {
+ Thread.sleep(500); // wait for watcher executed
+ } catch (InterruptedException e) {
+ logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e);
+ }
+
+ String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch
+ .get());
+ return new InterpreterResult(Code.SUCCESS, msg);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
new file mode 100644
index 0000000..ff3ff9f
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+
+import java.util.List;
+import java.util.Properties;
+
+public class MockInterpreterB extends Interpreter {
+
+ public MockInterpreterB(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ //new RuntimeException().printStackTrace();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ MockInterpreterA intpA = getInterpreterA();
+ String intpASt = intpA.getLastStatement();
+ long timeToSleep = Long.parseLong(st);
+ if (intpASt != null) {
+ timeToSleep += Long.parseLong(intpASt);
+ }
+ try {
+ Thread.sleep(timeToSleep);
+ } catch (NumberFormatException | InterruptedException e) {
+ throw new InterpreterException(e);
+ }
+ return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+
+ public MockInterpreterA getInterpreterA() {
+ InterpreterGroup interpreterGroup = getInterpreterGroup();
+ synchronized (interpreterGroup) {
+ for (List<Interpreter> interpreters : interpreterGroup.values()) {
+ boolean belongsToSameNoteGroup = false;
+ MockInterpreterA a = null;
+ for (Interpreter intp : interpreters) {
+ if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
+ Interpreter p = intp;
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ a = (MockInterpreterA) p;
+ }
+
+ Interpreter p = intp;
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ if (this == p) {
+ belongsToSameNoteGroup = true;
+ }
+ }
+ if (belongsToSameNoteGroup) {
+ return a;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ MockInterpreterA intpA = getInterpreterA();
+ if (intpA != null) {
+ return intpA.getScheduler();
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
new file mode 100644
index 0000000..1890cbc
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MockInterpreter to test outputstream
+ */
+public class MockInterpreterOutputStream extends Interpreter {
+ private String lastSt;
+
+ public MockInterpreterOutputStream(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ //new RuntimeException().printStackTrace();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public String getLastStatement() {
+ return lastSt;
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] ret = st.split(":");
+ try {
+ if (ret[1] != null) {
+ context.out.write(ret[1]);
+ }
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+ return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ?
+ ret[2] : "");
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
new file mode 100644
index 0000000..ee9f15c
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockInterpreterResourcePool extends Interpreter {
+
+ AtomicInteger numWatch = new AtomicInteger(0);
+
+ public MockInterpreterResourcePool(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] stmt = st.split(" ");
+ String cmd = stmt[0];
+ String noteId = null;
+ String paragraphId = null;
+ String name = null;
+ if (stmt.length >= 2) {
+ String[] npn = stmt[1].split(":");
+ if (npn.length >= 3) {
+ noteId = npn[0];
+ paragraphId = npn[1];
+ name = npn[2];
+ } else {
+ name = stmt[1];
+ }
+ }
+ String value = null;
+ if (stmt.length >= 3) {
+ value = stmt[2];
+ }
+
+ ResourcePool resourcePool = context.getResourcePool();
+ Object ret = null;
+ if (cmd.equals("put")) {
+ resourcePool.put(noteId, paragraphId, name, value);
+ } else if (cmd.equalsIgnoreCase("get")) {
+ Resource resource = resourcePool.get(noteId, paragraphId, name);
+ if (resource != null) {
+ ret = resourcePool.get(noteId, paragraphId, name).get();
+ } else {
+ ret = "";
+ }
+ } else if (cmd.equals("remove")) {
+ ret = resourcePool.remove(noteId, paragraphId, name);
+ } else if (cmd.equals("getAll")) {
+ ret = resourcePool.getAll();
+ } else if (cmd.equals("invoke")) {
+ Resource resource = resourcePool.get(noteId, paragraphId, name);
+ if (stmt.length >=4) {
+ Resource res = resource.invokeMethod(value, null, null, stmt[3]);
+ ret = res.get();
+ } else {
+ ret = resource.invokeMethod(value, null, null);
+ }
+ }
+
+ try {
+ Thread.sleep(500); // wait for watcher executed
+ } catch (InterruptedException e) {
+ }
+
+ Gson gson = new Gson();
+ return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
new file mode 100644
index 0000000..a1afe0e
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterInfo;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterRunner;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
+
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+
+ private InterpreterSetting interpreterSetting;
+ private SchedulerFactory schedulerSvc;
+ private static final int TICK_WAIT = 100;
+ private static final int MAX_WAIT_CYCLES = 100;
+
+ @Before
+ public void setUp() throws Exception {
+ schedulerSvc = new SchedulerFactory();
+
+ InterpreterOption interpreterOption = new InterpreterOption();
+ interpreterOption.setRemote(true);
+ InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterA.class.getName(), "mock", true, new HashMap<String, Object>());
+ List<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ interpreterInfos.add(interpreterInfo1);
+ InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT);
+ interpreterSetting = new InterpreterSetting.Builder()
+ .setId("test")
+ .setName("test")
+ .setGroup("test")
+ .setInterpreterInfos(interpreterInfos)
+ .setOption(interpreterOption)
+ .setRunner(runner)
+ .setInterpreterDir("../interpeters/test")
+ .create();
+ }
+
+ @After
+ public void tearDown() {
+ interpreterSetting.close();
+ }
+
+ @Test
+ public void test() throws Exception {
+ final RemoteInterpreter intpA = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
+
+ intpA.open();
+
+ Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
+ intpA,
+ 10);
+
+ Job job = new Job("jobId", "jobName", null, 200) {
+ Object results;
+
+ @Override
+ public Object getReturn() {
+ return results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ intpA.interpret("1000", new InterpreterContext(
+ "note",
+ "jobId",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ null,
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+ return "1000";
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ return false;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.results = results;
+ }
+ };
+ scheduler.submit(job);
+
+ int cycles = 0;
+ while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+ assertTrue(job.isRunning());
+
+ Thread.sleep(5 * TICK_WAIT);
+ assertEquals(0, scheduler.getJobsWaiting().size());
+ assertEquals(1, scheduler.getJobsRunning().size());
+
+ cycles = 0;
+ while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+
+ assertTrue(job.isTerminated());
+ assertEquals(0, scheduler.getJobsWaiting().size());
+ assertEquals(0, scheduler.getJobsRunning().size());
+
+ intpA.close();
+ schedulerSvc.removeScheduler("test");
+ }
+
+ @Test
+ public void testAbortOnPending() throws Exception {
+ final RemoteInterpreter intpA = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
+ intpA.open();
+
+ Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", intpA, 10);
+
+ Job job1 = new Job("jobId1", "jobName1", null, 200) {
+ Object results;
+ InterpreterContext context = new InterpreterContext(
+ "note",
+ "jobId1",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ null,
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null);
+
+ @Override
+ public Object getReturn() {
+ return results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ intpA.interpret("1000", context);
+ return "1000";
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ if (isRunning()) {
+ intpA.cancel(context);
+ }
+ return true;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.results = results;
+ }
+ };
+
+ Job job2 = new Job("jobId2", "jobName2", null, 200) {
+ public Object results;
+ InterpreterContext context = new InterpreterContext(
+ "note",
+ "jobId2",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ null,
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null);
+
+ @Override
+ public Object getReturn() {
+ return results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ intpA.interpret("1000", context);
+ return "1000";
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ if (isRunning()) {
+ intpA.cancel(context);
+ }
+ return true;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.results = results;
+ }
+ };
+
+ job2.setResult("result2");
+
+ scheduler.submit(job1);
+ scheduler.submit(job2);
+
+
+ int cycles = 0;
+ while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+ assertTrue(job1.isRunning());
+ assertTrue(job2.getStatus() == Status.PENDING);
+
+ job2.abort();
+
+ cycles = 0;
+ while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+
+ assertNotNull(job1.getDateFinished());
+ assertTrue(job1.isTerminated());
+ assertNull(job2.getDateFinished());
+ assertTrue(job2.isTerminated());
+ assertEquals("result2", job2.getReturn());
+
+ intpA.close();
+ schedulerSvc.removeScheduler("test");
+ }
+
+ @Override
+ public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
+
+ }
+
+ @Override
+ public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
+
+ }
+
+ @Override
+ public void onOutputClear(String noteId, String paragraphId) {
+
+ }
+
+ @Override
+ public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
+
+ }
+
+ @Override
+ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
+ if (callback != null) {
+ callback.onFinished(new LinkedList<>());
+ }
+ }
+
+ @Override
+ public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
+ }
+
+ @Override
+ public void onParaInfosReceived(String noteId, String paragraphId,
+ String interpreterSettingId, Map<String, String> metaInfos) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/resources/conf/interpreter.json
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/conf/interpreter.json b/zeppelin-interpreter/src/test/resources/conf/interpreter.json
new file mode 100644
index 0000000..45e1d60
--- /dev/null
+++ b/zeppelin-interpreter/src/test/resources/conf/interpreter.json
@@ -0,0 +1,115 @@
+{
+ "interpreterSettings": {
+ "2C3RWCVAG": {
+ "id": "2C3RWCVAG",
+ "name": "test",
+ "group": "test",
+ "properties": {
+ "property_1": "value_1",
+ "property_2": "new_value_2",
+ "property_3": "value_3"
+ },
+ "status": "READY",
+ "interpreterGroup": [
+ {
+ "name": "echo",
+ "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
+ "defaultInterpreter": true,
+ "editor": {
+ "language": "java",
+ "editOnDblClick": false
+ }
+ }
+ ],
+ "dependencies": [],
+ "option": {
+ "remote": true,
+ "port": -1,
+ "perNote": "shared",
+ "perUser": "shared",
+ "isExistingProcess": false,
+ "setPermission": false,
+ "users": [],
+ "isUserImpersonate": false
+ }
+ },
+
+ "2CKWE7B19": {
+ "id": "2CKWE7B19",
+ "name": "test2",
+ "group": "test",
+ "properties": {
+ "property_1": "value_1",
+ "property_2": "new_value_2",
+ "property_3": "value_3"
+ },
+ "status": "READY",
+ "interpreterGroup": [
+ {
+ "name": "echo",
+ "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
+ "defaultInterpreter": true,
+ "editor": {
+ "language": "java",
+ "editOnDblClick": false
+ }
+ }
+ ],
+ "dependencies": [],
+ "option": {
+ "remote": true,
+ "port": -1,
+ "perNote": "shared",
+ "perUser": "shared",
+ "isExistingProcess": false,
+ "setPermission": false,
+ "users": [],
+ "isUserImpersonate": false
+ }
+ }
+ },
+ "interpreterBindings": {
+ "2C6793KRV": [
+ "2C48Y7FSJ",
+ "2C63XW4XE",
+ "2C66GE1VB",
+ "2C5VH924X",
+ "2C4BJDRRZ",
+ "2C3SQSB7V",
+ "2C4HKDCQW",
+ "2C3DR183X",
+ "2C66Z9XPQ",
+ "2C3PTPMUH",
+ "2C69WE69N",
+ "2C5SRRXHM",
+ "2C4ZD49PF",
+ "2C6V3D44K",
+ "2C4UB1UZA",
+ "2C5S1R21W",
+ "2C5DCRVGM",
+ "2C686X8ZH",
+ "2C3RWCVAG",
+ "2C3JKFMJU",
+ "2C3VECEG2"
+ ]
+ },
+ "interpreterRepositories": [
+ {
+ "id": "central",
+ "type": "default",
+ "url": "http://repo1.maven.org/maven2/",
+ "releasePolicy": {
+ "enabled": true,
+ "updatePolicy": "daily",
+ "checksumPolicy": "warn"
+ },
+ "snapshotPolicy": {
+ "enabled": true,
+ "updatePolicy": "daily",
+ "checksumPolicy": "warn"
+ },
+ "mirroredRepositories": [],
+ "repositoryManager": false
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
new file mode 100644
index 0000000..1ba1b94
--- /dev/null
+++ b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
@@ -0,0 +1,42 @@
+[
+ {
+ "group": "test",
+ "name": "double_echo",
+ "className": "org.apache.zeppelin.interpreter.DoubleEchoInterpreter",
+ "properties": {
+ "property_1": {
+ "envName": "PROPERTY_1",
+ "propertyName": "property_1",
+ "defaultValue": "value_1",
+ "description": "desc_1"
+ },
+ "property_2": {
+ "envName": "PROPERTY_2",
+ "propertyName": "property_2",
+ "defaultValue": "value_2",
+ "description": "desc_2"
+ }
+ }
+ },
+
+ {
+ "group": "test",
+ "name": "echo",
+ "defaultInterpreter": true,
+ "className": "org.apache.zeppelin.interpreter.EchoInterpreter",
+ "properties": {
+ "property_1": {
+ "envName": "PROPERTY_1",
+ "propertyName": "property_1",
+ "defaultValue": "value_1",
+ "description": "desc_1"
+ },
+ "property_2": {
+ "envName": "PROPERTY_2",
+ "propertyName": "property_2",
+ "defaultValue": "value_2",
+ "description": "desc_2"
+ }
+ }
+ }
+]
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties b/zeppelin-interpreter/src/test/resources/log4j.properties
index d8a7839..6f34691 100644
--- a/zeppelin-interpreter/src/test/resources/log4j.properties
+++ b/zeppelin-interpreter/src/test/resources/log4j.properties
@@ -26,4 +26,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
#
# Root logger option
-log4j.rootLogger=INFO, stdout
\ No newline at end of file
+log4j.rootLogger=INFO, stdout
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.logger.org.apache.zeppelin.scheduler=DEBUG
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index cd0210e..c1dba5c 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -185,7 +185,7 @@ public class InterpreterRestApi {
String noteId = request == null ? null : request.getNoteId();
if (null == noteId) {
- interpreterSettingManager.close(setting);
+ interpreterSettingManager.close(settingId);
} else {
interpreterSettingManager.restart(settingId, noteId, SecurityUtils.getPrincipal());
}
@@ -208,7 +208,7 @@ public class InterpreterRestApi {
@GET
@ZeppelinApi
public Response listInterpreter(String message) {
- Map<String, InterpreterSetting> m = interpreterSettingManager.getAvailableInterpreterSettings();
+ Map<String, InterpreterSetting> m = interpreterSettingManager.getInterpreterSettingTemplates();
return new JsonResponse<>(Status.OK, "", m).build();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 7453470..53ee114 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -93,13 +93,11 @@ public class ZeppelinServer extends Application {
private NotebookRepoSync notebookRepo;
private NotebookAuthorization notebookAuthorization;
private Credentials credentials;
- private DependencyResolver depResolver;
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
- this.depResolver = new DependencyResolver(
- conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
+
InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
@@ -129,13 +127,26 @@ public class ZeppelinServer extends Application {
new File(conf.getRelativeDir("zeppelin-web/src/app/spell")));
}
+ this.schedulerFactory = SchedulerFactory.singleton();
+ this.interpreterSettingManager = new InterpreterSettingManager(conf, notebookWsServer,
+ notebookWsServer, notebookWsServer);
+ this.replFactory = new InterpreterFactory(interpreterSettingManager);
+ this.notebookRepo = new NotebookRepoSync(conf);
+ this.noteSearchService = new LuceneSearch();
+ this.notebookAuthorization = NotebookAuthorization.init(conf);
+ this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
+ notebook = new Notebook(conf,
+ notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
+ noteSearchService, notebookAuthorization, credentials);
+
ZeppelinServer.helium = new Helium(
conf.getHeliumConfPath(),
conf.getHeliumRegistry(),
new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO),
"helium-registry-cache"),
heliumBundleFactory,
- heliumApplicationFactory);
+ heliumApplicationFactory,
+ interpreterSettingManager);
// create bundle
try {
@@ -144,20 +155,6 @@ public class ZeppelinServer extends Application {
LOG.error(e.getMessage(), e);
}
- this.schedulerFactory = new SchedulerFactory();
- this.interpreterSettingManager = new InterpreterSettingManager(conf, depResolver,
- new InterpreterOption(true));
- this.replFactory = new InterpreterFactory(conf, notebookWsServer,
- notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated(),
- interpreterSettingManager);
- this.notebookRepo = new NotebookRepoSync(conf);
- this.noteSearchService = new LuceneSearch();
- this.notebookAuthorization = NotebookAuthorization.init(conf);
- this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
- notebook = new Notebook(conf,
- notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
- noteSearchService, notebookAuthorization, credentials);
-
// to update notebook from application event from remote process.
heliumApplicationFactory.setNotebook(notebook);
// to update fire websocket event on application event.
@@ -206,7 +203,7 @@ public class ZeppelinServer extends Application {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyWebServer.stop();
- notebook.getInterpreterSettingManager().shutdown();
+ notebook.getInterpreterSettingManager().close();
notebook.close();
Thread.sleep(3000);
} catch (Exception e) {