You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 06:50:00 UTC
[07/11] zeppelin git commit: Revert "[ZEPPELIN-2627] Interpreter
refactor"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
deleted file mode 100644
index 61e4ef0..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
-
- private RemoteInterpreter intp;
- private InterpreterContext context;
- private RemoteAngularObjectRegistry localRegistry;
- private InterpreterSetting interpreterSetting;
-
- private AtomicInteger onAdd;
- private AtomicInteger onUpdate;
- private AtomicInteger onRemove;
-
- @Before
- public void setUp() throws Exception {
- onAdd = new AtomicInteger(0);
- onUpdate = new AtomicInteger(0);
- onRemove = new AtomicInteger(0);
-
- InterpreterOption interpreterOption = new InterpreterOption();
- interpreterOption.setRemote(true);
- InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterAngular.class.getName(), "mock", true, new HashMap<String, Object>());
- List<InterpreterInfo> interpreterInfos = new ArrayList<>();
- interpreterInfos.add(interpreterInfo1);
- InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT);
- interpreterSetting = new InterpreterSetting.Builder()
- .setId("test")
- .setName("test")
- .setGroup("test")
- .setInterpreterInfos(interpreterInfos)
- .setOption(interpreterOption)
- .setRunner(runner)
- .setInterpreterDir("../interpeters/test")
- .create();
-
- intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
- localRegistry = (RemoteAngularObjectRegistry) intp.getInterpreterGroup().getAngularObjectRegistry();
-
- context = new InterpreterContext(
- "note",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intp.getInterpreterGroup().getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null);
-
- intp.open();
-
- }
-
- @After
- public void tearDown() throws Exception {
- interpreterSetting.close();
- }
-
- @Test
- public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
- InterpreterResult ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- String[] result = ret.message().get(0).getData().split(" ");
- assertEquals("0", result[0]); // size of registry
- assertEquals("0", result[1]); // num watcher called
-
- // create object
- ret = intp.interpret("add n1 v1", context);
- Thread.sleep(500);
- result = ret.message().get(0).getData().split(" ");
- assertEquals("1", result[0]); // size of registry
- assertEquals("0", result[1]); // num watcher called
- assertEquals("v1", localRegistry.get("n1", "note", null).get());
-
- // update object
- ret = intp.interpret("update n1 v11", context);
- result = ret.message().get(0).getData().split(" ");
- Thread.sleep(500);
- assertEquals("1", result[0]); // size of registry
- assertEquals("1", result[1]); // num watcher called
- assertEquals("v11", localRegistry.get("n1", "note", null).get());
-
- // remove object
- ret = intp.interpret("remove n1", context);
- result = ret.message().get(0).getData().split(" ");
- Thread.sleep(500);
- assertEquals("0", result[0]); // size of registry
- assertEquals("1", result[1]); // num watcher called
- assertEquals(null, localRegistry.get("n1", "note", null));
- }
-
- @Test
- public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
- // test if angularobject removal from server side propagate to interpreter process's registry.
- // will happen when notebook is removed.
-
- InterpreterResult ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- String[] result = ret.message().get(0).getData().split(" ");
- assertEquals("0", result[0]); // size of registry
-
- // create object
- ret = intp.interpret("add n1 v1", context);
- Thread.sleep(500);
- result = ret.message().get(0).getData().split(" ");
- assertEquals("1", result[0]); // size of registry
- assertEquals("v1", localRegistry.get("n1", "note", null).get());
-
- // remove object in local registry.
- localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
- ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- result = ret.message().get(0).getData().split(" ");
- assertEquals("0", result[0]); // size of registry
- }
-
- @Test
- public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
- // test if angularobject add from server side propagate to interpreter process's registry.
- // will happen when zeppelin server loads notebook and restore the object into registry
-
- InterpreterResult ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- String[] result = ret.message().get(0).getData().split(" ");
- assertEquals("0", result[0]); // size of registry
-
- // create object
- localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
-
- // get from remote registry
- ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- result = ret.message().get(0).getData().split(" ");
- assertEquals("1", result[0]); // size of registry
- }
-
- @Override
- public void onAdd(String interpreterGroupId, AngularObject object) {
- onAdd.incrementAndGet();
- }
-
- @Override
- public void onUpdate(String interpreterGroupId, AngularObject object) {
- onUpdate.incrementAndGet();
- }
-
- @Override
- public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
- onRemove.incrementAndGet();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
deleted file mode 100644
index 49aa7aa..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.junit.Test;
-
-import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class RemoteInterpreterEventPollerTest {
-
- @Test
- public void shouldClearUnreadEventsOnShutdown() throws Exception {
- RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
- RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
-
- eventPoller.setInterpreterProcess(interpreterProc);
- eventPoller.shutdown();
- eventPoller.start();
- eventPoller.join();
-
- assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
- }
-
- private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
- RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
- RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
- RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
- RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
-
- when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
- when(intProc.getClient()).thenReturn(client);
-
- return intProc;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
deleted file mode 100644
index 1687060..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.*;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-
-/**
- * Test for remote interpreter output stream
- */
-public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
-
- private InterpreterSetting interpreterSetting;
-
- @Before
- public void setUp() throws Exception {
- InterpreterOption interpreterOption = new InterpreterOption();
-
- interpreterOption.setRemote(true);
- InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterOutputStream.class.getName(), "mock", true, new HashMap<String, Object>());
- List<InterpreterInfo> interpreterInfos = new ArrayList<>();
- interpreterInfos.add(interpreterInfo1);
- InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT);
- interpreterSetting = new InterpreterSetting.Builder()
- .setId("test")
- .setName("test")
- .setGroup("test")
- .setInterpreterInfos(interpreterInfos)
- .setOption(interpreterOption)
- .setRunner(runner)
- .setInterpreterDir("../interpeters/test")
- .create();
- }
-
- @After
- public void tearDown() throws Exception {
- interpreterSetting.close();
- }
-
- private InterpreterContext createInterpreterContext() {
- return new InterpreterContext(
- "noteId",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- null,
- null,
- new LinkedList<InterpreterContextRunner>(), null);
- }
-
- @Test
- public void testInterpreterResultOnly() {
- RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
- InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals("staticresult", ret.message().get(0).getData());
-
- ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals("staticresult2", ret.message().get(0).getData());
-
- ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
- assertEquals(InterpreterResult.Code.ERROR, ret.code());
- assertEquals("staticresult3", ret.message().get(0).getData());
- }
-
- @Test
- public void testInterpreterOutputStreamOnly() {
- RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
- InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals("streamresult", ret.message().get(0).getData());
-
- ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
- assertEquals(InterpreterResult.Code.ERROR, ret.code());
- assertEquals("streamresult2", ret.message().get(0).getData());
- }
-
- @Test
- public void testInterpreterResultOutputStreamMixed() {
- RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
- InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals("stream", ret.message().get(0).getData());
- assertEquals("static", ret.message().get(1).getData());
- }
-
- @Test
- public void testOutputType() {
- RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
-
- InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
- assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
- assertEquals("hello", ret.message().get(0).getData());
-
- ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
- assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
- assertEquals("hello", ret.message().get(0).getData());
-
- ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
- assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
- assertEquals("hello", ret.message().get(0).getData());
- assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType());
- assertEquals("world", ret.message().get(1).getData());
- }
-
- @Override
- public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
-
- }
-
- @Override
- public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
-
- }
-
- @Override
- public void onOutputClear(String noteId, String paragraphId) {
-
- }
-
- @Override
- public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
-
- }
-
- @Override
- public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
- if (callback != null) {
- callback.onFinished(new LinkedList<>());
- }
- }
-
- @Override
- public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
-
- }
-
- @Override
- public void onParaInfosReceived(String noteId, String paragraphId,
- String interpreterSettingId, Map<String, String> metaInfos) {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
deleted file mode 100644
index ae98dc3..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ /dev/null
@@ -1,520 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.mock.GetEnvPropertyInterpreter;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-public class RemoteInterpreterTest {
-
-
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
-
- private InterpreterSetting interpreterSetting;
-
- @Before
- public void setUp() throws Exception {
- InterpreterOption interpreterOption = new InterpreterOption();
-
- interpreterOption.setRemote(true);
- InterpreterInfo interpreterInfo1 = new InterpreterInfo(EchoInterpreter.class.getName(), "echo", true, new HashMap<String, Object>());
- InterpreterInfo interpreterInfo2 = new InterpreterInfo(DoubleEchoInterpreter.class.getName(), "double_echo", false, new HashMap<String, Object>());
- InterpreterInfo interpreterInfo3 = new InterpreterInfo(SleepInterpreter.class.getName(), "sleep", false, new HashMap<String, Object>());
- InterpreterInfo interpreterInfo4 = new InterpreterInfo(GetEnvPropertyInterpreter.class.getName(), "get", false, new HashMap<String, Object>());
- List<InterpreterInfo> interpreterInfos = new ArrayList<>();
- interpreterInfos.add(interpreterInfo1);
- interpreterInfos.add(interpreterInfo2);
- interpreterInfos.add(interpreterInfo3);
- interpreterInfos.add(interpreterInfo4);
- InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT);
- interpreterSetting = new InterpreterSetting.Builder()
- .setId("test")
- .setName("test")
- .setGroup("test")
- .setInterpreterInfos(interpreterInfos)
- .setOption(interpreterOption)
- .setRunner(runner)
- .setInterpreterDir("../interpeters/test")
- .create();
- }
-
- @After
- public void tearDown() throws Exception {
- interpreterSetting.close();
- }
-
- @Test
- public void testSharedMode() {
- interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
-
- Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
- Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1");
- assertTrue(interpreter1 instanceof RemoteInterpreter);
- RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
- assertTrue(interpreter2 instanceof RemoteInterpreter);
- RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
-
- InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
- "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
- null, null, new ArrayList<InterpreterContextRunner>(), null);
- assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
- assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType());
- assertEquals(0, remoteInterpreter1.getProgress(context1));
- assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess());
- assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
-
- assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
- assertEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(),
- remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
-
- // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the
- // RemoteInterpreterProcess leakage.
- remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId());
- assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess());
- try {
- assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
- fail("Should not be able to call interpret after interpreter is closed");
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- try {
- assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
- fail("Should not be able to call getProgress after RemoterInterpreterProcess is stoped");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testScopedMode() {
- interpreterSetting.getOption().setPerUser(InterpreterOption.SCOPED);
-
- Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
- Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1");
- assertTrue(interpreter1 instanceof RemoteInterpreter);
- RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
- assertTrue(interpreter2 instanceof RemoteInterpreter);
- RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
-
- InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
- "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
- null, null, new ArrayList<InterpreterContextRunner>(), null);
- assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
- assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
- assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType());
- assertEquals(0, remoteInterpreter1.getProgress(context1));
-
- assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess());
- assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
-
- assertEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(),
- remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
- // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the
- // RemoteInterpreterProcess leakage.
- remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId());
- try {
- assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
- fail("Should not be able to call interpret after interpreter is closed");
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
- assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
- remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId());
- try {
- assertEquals("hello", remoteInterpreter2.interpret("hello", context1));
- fail("Should not be able to call interpret after interpreter is closed");
- } catch (Exception e) {
- e.printStackTrace();
- }
- assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
- }
-
- @Test
- public void testIsolatedMode() {
- interpreterSetting.getOption().setPerUser(InterpreterOption.ISOLATED);
-
- Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
- Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note1");
- assertTrue(interpreter1 instanceof RemoteInterpreter);
- RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
- assertTrue(interpreter2 instanceof RemoteInterpreter);
- RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
-
- InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
- "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
- null, null, new ArrayList<InterpreterContextRunner>(), null);
- assertEquals("hello", remoteInterpreter1.interpret("hello", context1).message().get(0).getData());
- assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
- assertEquals(Interpreter.FormType.NATIVE, interpreter1.getFormType());
- assertEquals(0, remoteInterpreter1.getProgress(context1));
- assertNotNull(remoteInterpreter1.getOrCreateInterpreterProcess());
- assertTrue(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
-
- assertNotEquals(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess(),
- remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
- // Call InterpreterGroup.close instead of Interpreter.close, otherwise we will have the
- // RemoteInterpreterProcess leakage.
- remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId());
- assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess());
- assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
- try {
- remoteInterpreter1.interpret("hello", context1);
- fail("Should not be able to call getProgress after interpreter is closed");
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
- remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId());
- try {
- assertEquals("hello", remoteInterpreter2.interpret("hello", context1).message().get(0).getData());
- fail("Should not be able to call interpret after interpreter is closed");
- } catch (Exception e) {
- e.printStackTrace();
- }
- assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
-
- }
-
-// @Test
-// public void testExecuteIncorrectPrecode() throws TTransportException, IOException {
-// interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
-// interpreterSetting.getProperties().setProperty("zeppelin.SleepInterpreter.precode", "fail test");
-//
-// Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
-// InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
-// "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
-// null, null, new ArrayList<InterpreterContextRunner>(), null);
-// assertEquals(Code.ERROR, interpreter1.interpret("10", context1).code());
-// }
-//
-// @Test
-// public void testExecuteCorrectPrecode() throws TTransportException, IOException {
-// interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
-// interpreterSetting.getProperties().setProperty("zeppelin.SleepInterpreter.precode", "1");
-//
-// Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
-// InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
-// "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
-// null, null, new ArrayList<InterpreterContextRunner>(), null);
-// assertEquals(Code.SUCCESS, interpreter1.interpret("10", context1).code());
-// }
-
- @Test
- public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
- interpreterSetting.setProperty("zeppelin.interpreter.echo.fail", "true");
- interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
-
- Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
- assertTrue(interpreter1 instanceof RemoteInterpreter);
- RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
-
- InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
- "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
- null, null, new ArrayList<InterpreterContextRunner>(), null);
- assertEquals(Code.ERROR, remoteInterpreter1.interpret("hello", context1).code());
- }
-
- @Test
- public void testFIFOScheduler() throws InterruptedException {
- interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
- // by default SleepInterpreter would use FIFOScheduler
-
- final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
- final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
- "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
- null, null, new ArrayList<InterpreterContextRunner>(), null);
- // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
- // time overhead of launching the process.
- interpreter1.interpret("1", context1);
- Thread thread1 = new Thread() {
- @Override
- public void run() {
- assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
- }
- };
- Thread thread2 = new Thread() {
- @Override
- public void run() {
- assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
- }
- };
- long start = System.currentTimeMillis();
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
- long end = System.currentTimeMillis();
- assertTrue((end - start) >= 200);
- }
-
- @Test
- public void testParallelScheduler() throws InterruptedException {
- interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
- interpreterSetting.setProperty("zeppelin.SleepInterpreter.parallel", "true");
-
- final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
- final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
- "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
- null, null, new ArrayList<InterpreterContextRunner>(), null);
-
- // run this dummy interpret method first to launch the RemoteInterpreterProcess to avoid the
- // time overhead of launching the process.
- interpreter1.interpret("1", context1);
- Thread thread1 = new Thread() {
- @Override
- public void run() {
- assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
- }
- };
- Thread thread2 = new Thread() {
- @Override
- public void run() {
- assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
- }
- };
- long start = System.currentTimeMillis();
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
- long end = System.currentTimeMillis();
- assertTrue((end - start) <= 200);
- }
-
-// @Test
-// public void testRunOrderPreserved() throws InterruptedException {
-// Properties p = new Properties();
-// intpGroup.put("note", new LinkedList<Interpreter>());
-//
-// final RemoteInterpreter intpA = createMockInterpreterA(p);
-//
-// intpGroup.get("note").add(intpA);
-// intpA.setInterpreterGroup(intpGroup);
-//
-// intpA.open();
-//
-// int concurrency = 3;
-// final List<InterpreterResultMessage> results = new LinkedList<>();
-//
-// Scheduler scheduler = intpA.getScheduler();
-// for (int i = 0; i < concurrency; i++) {
-// final String jobId = Integer.toString(i);
-// scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
-// private Object r;
-//
-// @Override
-// public Object getReturn() {
-// return r;
-// }
-//
-// @Override
-// public void setResult(Object results) {
-// this.r = results;
-// }
-//
-// @Override
-// public int progress() {
-// return 0;
-// }
-//
-// @Override
-// public Map<String, Object> info() {
-// return null;
-// }
-//
-// @Override
-// protected Object jobRun() throws Throwable {
-// InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext(
-// "note",
-// jobId,
-// null,
-// "title",
-// "text",
-// new AuthenticationInfo(),
-// new HashMap<String, Object>(),
-// new GUI(),
-// new AngularObjectRegistry(intpGroup.getId(), null),
-// new LocalResourcePool("pool1"),
-// new LinkedList<InterpreterContextRunner>(), null));
-//
-// synchronized (results) {
-// results.addAll(ret.message());
-// results.notify();
-// }
-// return null;
-// }
-//
-// @Override
-// protected boolean jobAbort() {
-// return false;
-// }
-//
-// });
-// }
-//
-// // wait for job finished
-// synchronized (results) {
-// while (results.size() != concurrency) {
-// results.wait(300);
-// }
-// }
-//
-// int i = 0;
-// for (InterpreterResultMessage result : results) {
-// assertEquals(Integer.toString(i++), result.getData());
-// }
-// assertEquals(concurrency, i);
-//
-// intpA.close();
-// }
-
-
-// @Test
-// public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
-// Properties p = new Properties();
-// intpGroup.put("note", new LinkedList<Interpreter>());
-//
-// RemoteInterpreter intpA = createMockInterpreterA(p);
-//
-// intpGroup.get("note").add(intpA);
-// intpA.setInterpreterGroup(intpGroup);
-//
-// RemoteInterpreter intpB = createMockInterpreterB(p);
-//
-// intpGroup.get("note").add(intpB);
-// intpB.setInterpreterGroup(intpGroup);
-//
-// intpA.open();
-// intpB.open();
-//
-// assertEquals(intpA.getScheduler(), intpB.getScheduler());
-// }
-
-// @Test
-// public void testMultiInterpreterSession() {
-// Properties p = new Properties();
-// intpGroup.put("sessionA", new LinkedList<Interpreter>());
-// intpGroup.put("sessionB", new LinkedList<Interpreter>());
-//
-// RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
-// intpGroup.get("sessionA").add(intpAsessionA);
-// intpAsessionA.setInterpreterGroup(intpGroup);
-//
-// RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
-// intpGroup.get("sessionA").add(intpBsessionA);
-// intpBsessionA.setInterpreterGroup(intpGroup);
-//
-// intpAsessionA.open();
-// intpBsessionA.open();
-//
-// assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
-//
-// RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
-// intpGroup.get("sessionB").add(intpAsessionB);
-// intpAsessionB.setInterpreterGroup(intpGroup);
-//
-// RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
-// intpGroup.get("sessionB").add(intpBsessionB);
-// intpBsessionB.setInterpreterGroup(intpGroup);
-//
-// intpAsessionB.open();
-// intpBsessionB.open();
-//
-// assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
-// assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
-// }
-
-// @Test
-// public void should_push_local_angular_repo_to_remote() throws Exception {
-// //Given
-// final Client client = mock(Client.class);
-// final RemoteInterpreter intr = null;
-//// new RemoteInterpreter(new Properties(), "noteId",
-//// MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null,
-//// null, "anonymous", false);
-// final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
-// registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
-// final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
-// interpreterGroup.setAngularObjectRegistry(registry);
-// intr.setInterpreterGroup(interpreterGroup);
-//
-// final java.lang.reflect.Type registryType = new TypeToken<Map<String,
-// Map<String, AngularObject>>>() {
-// }.getType();
-// final Gson gson = new Gson();
-// final String expected = gson.toJson(registry.getRegistry(), registryType);
-//
-// //When
-//// intr.pushAngularObjectRegistryToRemote(client);
-//
-// //Then
-// Mockito.verify(client).angularRegistryPush(expected);
-// }
-
- @Test
- public void testEnvStringPattern() {
- assertFalse(RemoteInterpreterUtils.isEnvString(null));
- assertFalse(RemoteInterpreterUtils.isEnvString(""));
- assertFalse(RemoteInterpreterUtils.isEnvString("abcDEF"));
- assertFalse(RemoteInterpreterUtils.isEnvString("ABC-DEF"));
- assertTrue(RemoteInterpreterUtils.isEnvString("ABCDEF"));
- assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF"));
- assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF123"));
- }
-
- @Test
- public void testEnvironmentAndProperty() {
- interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
- interpreterSetting.setProperty("ENV_1", "VALUE_1");
- interpreterSetting.setProperty("property_1", "value_1");
-
- final Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "get");
- final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
- "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
- null, null, new ArrayList<InterpreterContextRunner>(), null);
-
- assertEquals("VALUE_1", interpreter1.interpret("getEnv ENV_1", context1).message().get(0).getData());
- assertEquals("null", interpreter1.interpret("getEnv ENV_2", context1).message().get(0).getData());
-
- assertEquals("value_1", interpreter1.interpret("getProperty property_1", context1).message().get(0).getData());
- assertEquals("null", interpreter1.interpret("getProperty property_2", context1).message().get(0).getData());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
deleted file mode 100644
index 5f7426a..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertTrue;
-
-public class RemoteInterpreterUtilsTest {
-
- @Test
- public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
- assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java
deleted file mode 100644
index a039a59..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/GetEnvPropertyInterpreter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-
-public class GetEnvPropertyInterpreter extends Interpreter {
-
- public GetEnvPropertyInterpreter(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- String[] cmd = st.split(" ");
- if (cmd[0].equals("getEnv")) {
- return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1]) == null ? "null" : System.getenv(cmd[1]));
- } else if (cmd[0].equals("getProperty")){
- return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1]) == null ? "null" : System.getProperty(cmd[1]));
- } else {
- return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]);
- }
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
deleted file mode 100644
index 5a3e57c..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-public class MockInterpreterA extends Interpreter {
-
- private String lastSt;
-
- public MockInterpreterA(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- //new RuntimeException().printStackTrace();
- }
-
- @Override
- public void close() {
- }
-
- public String getLastStatement() {
- return lastSt;
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- if (property.containsKey("progress")) {
- context.setProgress(Integer.parseInt(getProperty("progress")));
- }
- try {
- Thread.sleep(Long.parseLong(st));
- this.lastSt = st;
- } catch (NumberFormatException | InterruptedException e) {
- throw new InterpreterException(e);
- }
- return new InterpreterResult(Code.SUCCESS, st);
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- if (getProperty("parallel") != null && getProperty("parallel").equals("true")) {
- return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10);
- } else {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
deleted file mode 100644
index ec89241..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class MockInterpreterAngular extends Interpreter {
-
- AtomicInteger numWatch = new AtomicInteger(0);
-
- public MockInterpreterAngular(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- String[] stmt = st.split(" ");
- String cmd = stmt[0];
- String name = null;
- if (stmt.length >= 2) {
- name = stmt[1];
- }
- String value = null;
- if (stmt.length == 3) {
- value = stmt[2];
- }
-
- AngularObjectRegistry registry = context.getAngularObjectRegistry();
-
- if (cmd.equals("add")) {
- registry.add(name, value, context.getNoteId(), null);
- registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher
- (null) {
-
- @Override
- public void watch(Object oldObject, Object newObject,
- InterpreterContext context) {
- numWatch.incrementAndGet();
- }
-
- });
- } else if (cmd.equalsIgnoreCase("update")) {
- registry.get(name, context.getNoteId(), null).set(value);
- } else if (cmd.equals("remove")) {
- registry.remove(name, context.getNoteId(), null);
- }
-
- try {
- Thread.sleep(500); // wait for watcher executed
- } catch (InterruptedException e) {
- logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e);
- }
-
- String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch
- .get());
- return new InterpreterResult(Code.SUCCESS, msg);
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
deleted file mode 100644
index ff3ff9f..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-
-import java.util.List;
-import java.util.Properties;
-
-public class MockInterpreterB extends Interpreter {
-
- public MockInterpreterB(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- //new RuntimeException().printStackTrace();
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- MockInterpreterA intpA = getInterpreterA();
- String intpASt = intpA.getLastStatement();
- long timeToSleep = Long.parseLong(st);
- if (intpASt != null) {
- timeToSleep += Long.parseLong(intpASt);
- }
- try {
- Thread.sleep(timeToSleep);
- } catch (NumberFormatException | InterruptedException e) {
- throw new InterpreterException(e);
- }
- return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-
- public MockInterpreterA getInterpreterA() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
- synchronized (interpreterGroup) {
- for (List<Interpreter> interpreters : interpreterGroup.values()) {
- boolean belongsToSameNoteGroup = false;
- MockInterpreterA a = null;
- for (Interpreter intp : interpreters) {
- if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- a = (MockInterpreterA) p;
- }
-
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- if (this == p) {
- belongsToSameNoteGroup = true;
- }
- }
- if (belongsToSameNoteGroup) {
- return a;
- }
- }
- }
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- MockInterpreterA intpA = getInterpreterA();
- if (intpA != null) {
- return intpA.getScheduler();
- }
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
deleted file mode 100644
index 1890cbc..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * MockInterpreter to test outputstream
- */
-public class MockInterpreterOutputStream extends Interpreter {
- private String lastSt;
-
- public MockInterpreterOutputStream(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- //new RuntimeException().printStackTrace();
- }
-
- @Override
- public void close() {
- }
-
- public String getLastStatement() {
- return lastSt;
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- String[] ret = st.split(":");
- try {
- if (ret[1] != null) {
- context.out.write(ret[1]);
- }
- } catch (IOException e) {
- throw new InterpreterException(e);
- }
- return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ?
- ret[2] : "");
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
deleted file mode 100644
index ee9f15c..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import com.google.gson.Gson;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourcePool;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class MockInterpreterResourcePool extends Interpreter {
-
- AtomicInteger numWatch = new AtomicInteger(0);
-
- public MockInterpreterResourcePool(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- String[] stmt = st.split(" ");
- String cmd = stmt[0];
- String noteId = null;
- String paragraphId = null;
- String name = null;
- if (stmt.length >= 2) {
- String[] npn = stmt[1].split(":");
- if (npn.length >= 3) {
- noteId = npn[0];
- paragraphId = npn[1];
- name = npn[2];
- } else {
- name = stmt[1];
- }
- }
- String value = null;
- if (stmt.length >= 3) {
- value = stmt[2];
- }
-
- ResourcePool resourcePool = context.getResourcePool();
- Object ret = null;
- if (cmd.equals("put")) {
- resourcePool.put(noteId, paragraphId, name, value);
- } else if (cmd.equalsIgnoreCase("get")) {
- Resource resource = resourcePool.get(noteId, paragraphId, name);
- if (resource != null) {
- ret = resourcePool.get(noteId, paragraphId, name).get();
- } else {
- ret = "";
- }
- } else if (cmd.equals("remove")) {
- ret = resourcePool.remove(noteId, paragraphId, name);
- } else if (cmd.equals("getAll")) {
- ret = resourcePool.getAll();
- } else if (cmd.equals("invoke")) {
- Resource resource = resourcePool.get(noteId, paragraphId, name);
- if (stmt.length >=4) {
- Resource res = resource.invokeMethod(value, null, null, stmt[3]);
- ret = res.get();
- } else {
- ret = resource.invokeMethod(value, null, null);
- }
- }
-
- try {
- Thread.sleep(500); // wait for watcher executed
- } catch (InterruptedException e) {
- }
-
- Gson gson = new Gson();
- return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
deleted file mode 100644
index a1afe0e..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.scheduler;
-
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterInfo;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterRunner;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
-
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
-
- private InterpreterSetting interpreterSetting;
- private SchedulerFactory schedulerSvc;
- private static final int TICK_WAIT = 100;
- private static final int MAX_WAIT_CYCLES = 100;
-
- @Before
- public void setUp() throws Exception {
- schedulerSvc = new SchedulerFactory();
-
- InterpreterOption interpreterOption = new InterpreterOption();
- interpreterOption.setRemote(true);
- InterpreterInfo interpreterInfo1 = new InterpreterInfo(MockInterpreterA.class.getName(), "mock", true, new HashMap<String, Object>());
- List<InterpreterInfo> interpreterInfos = new ArrayList<>();
- interpreterInfos.add(interpreterInfo1);
- InterpreterRunner runner = new InterpreterRunner(INTERPRETER_SCRIPT, INTERPRETER_SCRIPT);
- interpreterSetting = new InterpreterSetting.Builder()
- .setId("test")
- .setName("test")
- .setGroup("test")
- .setInterpreterInfos(interpreterInfos)
- .setOption(interpreterOption)
- .setRunner(runner)
- .setInterpreterDir("../interpeters/test")
- .create();
- }
-
- @After
- public void tearDown() {
- interpreterSetting.close();
- }
-
- @Test
- public void test() throws Exception {
- final RemoteInterpreter intpA = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
-
- intpA.open();
-
- Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
- intpA,
- 10);
-
- Job job = new Job("jobId", "jobName", null, 200) {
- Object results;
-
- @Override
- public Object getReturn() {
- return results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- intpA.interpret("1000", new InterpreterContext(
- "note",
- "jobId",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- null,
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
- return "1000";
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- @Override
- public void setResult(Object results) {
- this.results = results;
- }
- };
- scheduler.submit(job);
-
- int cycles = 0;
- while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) {
- Thread.sleep(TICK_WAIT);
- cycles++;
- }
- assertTrue(job.isRunning());
-
- Thread.sleep(5 * TICK_WAIT);
- assertEquals(0, scheduler.getJobsWaiting().size());
- assertEquals(1, scheduler.getJobsRunning().size());
-
- cycles = 0;
- while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) {
- Thread.sleep(TICK_WAIT);
- cycles++;
- }
-
- assertTrue(job.isTerminated());
- assertEquals(0, scheduler.getJobsWaiting().size());
- assertEquals(0, scheduler.getJobsRunning().size());
-
- intpA.close();
- schedulerSvc.removeScheduler("test");
- }
-
- @Test
- public void testAbortOnPending() throws Exception {
- final RemoteInterpreter intpA = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
- intpA.open();
-
- Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", intpA, 10);
-
- Job job1 = new Job("jobId1", "jobName1", null, 200) {
- Object results;
- InterpreterContext context = new InterpreterContext(
- "note",
- "jobId1",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- null,
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null);
-
- @Override
- public Object getReturn() {
- return results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- intpA.interpret("1000", context);
- return "1000";
- }
-
- @Override
- protected boolean jobAbort() {
- if (isRunning()) {
- intpA.cancel(context);
- }
- return true;
- }
-
- @Override
- public void setResult(Object results) {
- this.results = results;
- }
- };
-
- Job job2 = new Job("jobId2", "jobName2", null, 200) {
- public Object results;
- InterpreterContext context = new InterpreterContext(
- "note",
- "jobId2",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- null,
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null);
-
- @Override
- public Object getReturn() {
- return results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- intpA.interpret("1000", context);
- return "1000";
- }
-
- @Override
- protected boolean jobAbort() {
- if (isRunning()) {
- intpA.cancel(context);
- }
- return true;
- }
-
- @Override
- public void setResult(Object results) {
- this.results = results;
- }
- };
-
- job2.setResult("result2");
-
- scheduler.submit(job1);
- scheduler.submit(job2);
-
-
- int cycles = 0;
- while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
- Thread.sleep(TICK_WAIT);
- cycles++;
- }
- assertTrue(job1.isRunning());
- assertTrue(job2.getStatus() == Status.PENDING);
-
- job2.abort();
-
- cycles = 0;
- while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
- Thread.sleep(TICK_WAIT);
- cycles++;
- }
-
- assertNotNull(job1.getDateFinished());
- assertTrue(job1.isTerminated());
- assertNull(job2.getDateFinished());
- assertTrue(job2.isTerminated());
- assertEquals("result2", job2.getReturn());
-
- intpA.close();
- schedulerSvc.removeScheduler("test");
- }
-
- @Override
- public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
-
- }
-
- @Override
- public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
-
- }
-
- @Override
- public void onOutputClear(String noteId, String paragraphId) {
-
- }
-
- @Override
- public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
-
- }
-
- @Override
- public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
- if (callback != null) {
- callback.onFinished(new LinkedList<>());
- }
- }
-
- @Override
- public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
- }
-
- @Override
- public void onParaInfosReceived(String noteId, String paragraphId,
- String interpreterSettingId, Map<String, String> metaInfos) {
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/resources/conf/interpreter.json
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/conf/interpreter.json b/zeppelin-interpreter/src/test/resources/conf/interpreter.json
deleted file mode 100644
index 45e1d60..0000000
--- a/zeppelin-interpreter/src/test/resources/conf/interpreter.json
+++ /dev/null
@@ -1,115 +0,0 @@
-{
- "interpreterSettings": {
- "2C3RWCVAG": {
- "id": "2C3RWCVAG",
- "name": "test",
- "group": "test",
- "properties": {
- "property_1": "value_1",
- "property_2": "new_value_2",
- "property_3": "value_3"
- },
- "status": "READY",
- "interpreterGroup": [
- {
- "name": "echo",
- "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
- "defaultInterpreter": true,
- "editor": {
- "language": "java",
- "editOnDblClick": false
- }
- }
- ],
- "dependencies": [],
- "option": {
- "remote": true,
- "port": -1,
- "perNote": "shared",
- "perUser": "shared",
- "isExistingProcess": false,
- "setPermission": false,
- "users": [],
- "isUserImpersonate": false
- }
- },
-
- "2CKWE7B19": {
- "id": "2CKWE7B19",
- "name": "test2",
- "group": "test",
- "properties": {
- "property_1": "value_1",
- "property_2": "new_value_2",
- "property_3": "value_3"
- },
- "status": "READY",
- "interpreterGroup": [
- {
- "name": "echo",
- "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
- "defaultInterpreter": true,
- "editor": {
- "language": "java",
- "editOnDblClick": false
- }
- }
- ],
- "dependencies": [],
- "option": {
- "remote": true,
- "port": -1,
- "perNote": "shared",
- "perUser": "shared",
- "isExistingProcess": false,
- "setPermission": false,
- "users": [],
- "isUserImpersonate": false
- }
- }
- },
- "interpreterBindings": {
- "2C6793KRV": [
- "2C48Y7FSJ",
- "2C63XW4XE",
- "2C66GE1VB",
- "2C5VH924X",
- "2C4BJDRRZ",
- "2C3SQSB7V",
- "2C4HKDCQW",
- "2C3DR183X",
- "2C66Z9XPQ",
- "2C3PTPMUH",
- "2C69WE69N",
- "2C5SRRXHM",
- "2C4ZD49PF",
- "2C6V3D44K",
- "2C4UB1UZA",
- "2C5S1R21W",
- "2C5DCRVGM",
- "2C686X8ZH",
- "2C3RWCVAG",
- "2C3JKFMJU",
- "2C3VECEG2"
- ]
- },
- "interpreterRepositories": [
- {
- "id": "central",
- "type": "default",
- "url": "http://repo1.maven.org/maven2/",
- "releasePolicy": {
- "enabled": true,
- "updatePolicy": "daily",
- "checksumPolicy": "warn"
- },
- "snapshotPolicy": {
- "enabled": true,
- "updatePolicy": "daily",
- "checksumPolicy": "warn"
- },
- "mirroredRepositories": [],
- "repositoryManager": false
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
deleted file mode 100644
index 1ba1b94..0000000
--- a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
+++ /dev/null
@@ -1,42 +0,0 @@
-[
- {
- "group": "test",
- "name": "double_echo",
- "className": "org.apache.zeppelin.interpreter.DoubleEchoInterpreter",
- "properties": {
- "property_1": {
- "envName": "PROPERTY_1",
- "propertyName": "property_1",
- "defaultValue": "value_1",
- "description": "desc_1"
- },
- "property_2": {
- "envName": "PROPERTY_2",
- "propertyName": "property_2",
- "defaultValue": "value_2",
- "description": "desc_2"
- }
- }
- },
-
- {
- "group": "test",
- "name": "echo",
- "defaultInterpreter": true,
- "className": "org.apache.zeppelin.interpreter.EchoInterpreter",
- "properties": {
- "property_1": {
- "envName": "PROPERTY_1",
- "propertyName": "property_1",
- "defaultValue": "value_1",
- "description": "desc_1"
- },
- "property_2": {
- "envName": "PROPERTY_2",
- "propertyName": "property_2",
- "defaultValue": "value_2",
- "description": "desc_2"
- }
- }
- }
-]
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties b/zeppelin-interpreter/src/test/resources/log4j.properties
index 6f34691..d8a7839 100644
--- a/zeppelin-interpreter/src/test/resources/log4j.properties
+++ b/zeppelin-interpreter/src/test/resources/log4j.properties
@@ -26,6 +26,4 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
#
# Root logger option
-log4j.rootLogger=INFO, stdout
-log4j.logger.org.apache.zeppelin.interpreter=DEBUG
-log4j.logger.org.apache.zeppelin.scheduler=DEBUG
\ No newline at end of file
+log4j.rootLogger=INFO, stdout
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index c1dba5c..cd0210e 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -185,7 +185,7 @@ public class InterpreterRestApi {
String noteId = request == null ? null : request.getNoteId();
if (null == noteId) {
- interpreterSettingManager.close(settingId);
+ interpreterSettingManager.close(setting);
} else {
interpreterSettingManager.restart(settingId, noteId, SecurityUtils.getPrincipal());
}
@@ -208,7 +208,7 @@ public class InterpreterRestApi {
@GET
@ZeppelinApi
public Response listInterpreter(String message) {
- Map<String, InterpreterSetting> m = interpreterSettingManager.getInterpreterSettingTemplates();
+ Map<String, InterpreterSetting> m = interpreterSettingManager.getAvailableInterpreterSettings();
return new JsonResponse<>(Status.OK, "", m).build();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 53ee114..7453470 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -93,11 +93,13 @@ public class ZeppelinServer extends Application {
private NotebookRepoSync notebookRepo;
private NotebookAuthorization notebookAuthorization;
private Credentials credentials;
+ private DependencyResolver depResolver;
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
-
+ this.depResolver = new DependencyResolver(
+ conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
@@ -127,26 +129,13 @@ public class ZeppelinServer extends Application {
new File(conf.getRelativeDir("zeppelin-web/src/app/spell")));
}
- this.schedulerFactory = SchedulerFactory.singleton();
- this.interpreterSettingManager = new InterpreterSettingManager(conf, notebookWsServer,
- notebookWsServer, notebookWsServer);
- this.replFactory = new InterpreterFactory(interpreterSettingManager);
- this.notebookRepo = new NotebookRepoSync(conf);
- this.noteSearchService = new LuceneSearch();
- this.notebookAuthorization = NotebookAuthorization.init(conf);
- this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
- notebook = new Notebook(conf,
- notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
- noteSearchService, notebookAuthorization, credentials);
-
ZeppelinServer.helium = new Helium(
conf.getHeliumConfPath(),
conf.getHeliumRegistry(),
new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO),
"helium-registry-cache"),
heliumBundleFactory,
- heliumApplicationFactory,
- interpreterSettingManager);
+ heliumApplicationFactory);
// create bundle
try {
@@ -155,6 +144,20 @@ public class ZeppelinServer extends Application {
LOG.error(e.getMessage(), e);
}
+ this.schedulerFactory = new SchedulerFactory();
+ this.interpreterSettingManager = new InterpreterSettingManager(conf, depResolver,
+ new InterpreterOption(true));
+ this.replFactory = new InterpreterFactory(conf, notebookWsServer,
+ notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated(),
+ interpreterSettingManager);
+ this.notebookRepo = new NotebookRepoSync(conf);
+ this.noteSearchService = new LuceneSearch();
+ this.notebookAuthorization = NotebookAuthorization.init(conf);
+ this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
+ notebook = new Notebook(conf,
+ notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
+ noteSearchService, notebookAuthorization, credentials);
+
// to update notebook from application event from remote process.
heliumApplicationFactory.setNotebook(notebook);
// to update fire websocket event on application event.
@@ -203,7 +206,7 @@ public class ZeppelinServer extends Application {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyWebServer.stop();
- notebook.getInterpreterSettingManager().close();
+ notebook.getInterpreterSettingManager().shutdown();
notebook.close();
Thread.sleep(3000);
} catch (Exception e) {