You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/05/06 13:46:41 UTC
[1/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine
Repository: zeppelin
Updated Branches:
refs/heads/master 8e96d8bd7 -> d9c4a5f0b
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
new file mode 100644
index 0000000..2ba7a76
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+public class RemoteInterpreterTest {
+
+
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+
+ private InterpreterGroup intpGroup;
+ private HashMap<String, String> env;
+
+ @Before
+ public void setUp() throws Exception {
+ intpGroup = new InterpreterGroup();
+ env = new HashMap<>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ intpGroup.close();
+ }
+
+ private RemoteInterpreter createMockInterpreterA(Properties p) {
+ return createMockInterpreterA(p, "note");
+ }
+
+ private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) {
+ return new RemoteInterpreter(
+ p,
+ noteId,
+ MockInterpreterA.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ null,
+ null,
+ "anonymous",
+ false);
+ }
+
+ private RemoteInterpreter createMockInterpreterB(Properties p) {
+ return createMockInterpreterB(p, "note");
+ }
+
+ private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) {
+ return new RemoteInterpreter(
+ p,
+ noteId,
+ MockInterpreterB.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ null,
+ null,
+ "anonymous",
+ false);
+ }
+
+ @Test
+ public void testRemoteInterperterCall() throws TTransportException, IOException {
+ Properties p = new Properties();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpGroup.get("note").add(intpA);
+
+ intpA.setInterpreterGroup(intpGroup);
+
+ RemoteInterpreter intpB = createMockInterpreterB(p);
+
+ intpGroup.get("note").add(intpB);
+ intpB.setInterpreterGroup(intpGroup);
+
+
+ RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+ process.equals(intpB.getInterpreterProcess());
+
+ assertFalse(process.isRunning());
+ assertEquals(0, process.getNumIdleClient());
+ assertEquals(0, process.referenceCount());
+
+ intpA.open(); // initializa all interpreters in the same group
+ assertTrue(process.isRunning());
+ assertEquals(1, process.getNumIdleClient());
+ assertEquals(1, process.referenceCount());
+
+ intpA.interpret("1",
+ new InterpreterContext(
+ "note",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+
+ intpB.open();
+ assertEquals(1, process.referenceCount());
+
+ intpA.close();
+ assertEquals(0, process.referenceCount());
+ intpB.close();
+ assertEquals(0, process.referenceCount());
+
+ assertFalse(process.isRunning());
+
+ }
+
+ @Test
+ public void testExecuteIncorrectPrecode() throws TTransportException, IOException {
+ Properties p = new Properties();
+ p.put("zeppelin.MockInterpreterA.precode", "fail test");
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpGroup.get("note").add(intpA);
+
+ intpA.setInterpreterGroup(intpGroup);
+
+ RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+
+ intpA.open();
+
+ InterpreterResult result = intpA.interpret("1",
+ new InterpreterContext(
+ "note",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+
+
+
+ intpA.close();
+ assertEquals(Code.ERROR, result.code());
+ }
+
+ @Test
+ public void testExecuteCorrectPrecode() throws TTransportException, IOException {
+ Properties p = new Properties();
+ p.put("zeppelin.MockInterpreterA.precode", "2");
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpGroup.get("note").add(intpA);
+
+ intpA.setInterpreterGroup(intpGroup);
+
+ RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+
+ intpA.open();
+
+ InterpreterResult result = intpA.interpret("1",
+ new InterpreterContext(
+ "note",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+
+
+
+ intpA.close();
+ assertEquals(Code.SUCCESS, result.code());
+ assertEquals("1", result.message().get(0).getData());
+ }
+
+ @Test
+ public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
+ Properties p = new Properties();
+
+ RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpGroup.put("note", new LinkedList<Interpreter>());
+ intpGroup.get("note").add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+ InterpreterResult ret = intpA.interpret("non numeric value",
+ new InterpreterContext(
+ "noteId",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+
+ assertEquals(Code.ERROR, ret.code());
+ }
+
+ @Test
+ public void testRemoteSchedulerSharing() throws TTransportException, IOException {
+ Properties p = new Properties();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ RemoteInterpreter intpA = new RemoteInterpreter(
+ p,
+ "note",
+ MockInterpreterA.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ null,
+ null,
+ "anonymous",
+ false);
+
+ intpGroup.get("note").add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ RemoteInterpreter intpB = new RemoteInterpreter(
+ p,
+ "note",
+ MockInterpreterB.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ null,
+ null,
+ "anonymous",
+ false);
+
+ intpGroup.get("note").add(intpB);
+ intpB.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+ intpB.open();
+
+ long start = System.currentTimeMillis();
+ InterpreterResult ret = intpA.interpret("500",
+ new InterpreterContext(
+ "note",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+ assertEquals("500", ret.message().get(0).getData());
+
+ ret = intpB.interpret("500",
+ new InterpreterContext(
+ "note",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+ assertEquals("1000", ret.message().get(0).getData());
+ long end = System.currentTimeMillis();
+ assertTrue(end - start >= 1000);
+
+
+ intpA.close();
+ intpB.close();
+ }
+
+ @Test
+ public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
+ Properties p = new Properties();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpGroup.get("note").add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ final RemoteInterpreter intpB = createMockInterpreterB(p);
+
+ intpGroup.get("note").add(intpB);
+ intpB.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+ intpB.open();
+
+ long start = System.currentTimeMillis();
+ Job jobA = new Job("jobA", null) {
+ private Object r;
+
+ @Override
+ public Object getReturn() {
+ return r;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.r = results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ return intpA.interpret("500",
+ new InterpreterContext(
+ "note",
+ "jobA",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ return false;
+ }
+
+ };
+ intpA.getScheduler().submit(jobA);
+
+ Job jobB = new Job("jobB", null) {
+
+ private Object r;
+
+ @Override
+ public Object getReturn() {
+ return r;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.r = results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ return intpB.interpret("500",
+ new InterpreterContext(
+ "note",
+ "jobB",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ return false;
+ }
+
+ };
+ intpB.getScheduler().submit(jobB);
+ // wait until both job finished
+ while (jobA.getStatus() != Status.FINISHED ||
+ jobB.getStatus() != Status.FINISHED) {
+ Thread.sleep(100);
+ }
+ long end = System.currentTimeMillis();
+ assertTrue(end - start >= 1000);
+
+ assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message().get(0).getData());
+
+ intpA.close();
+ intpB.close();
+ }
+
+ @Test
+ public void testRunOrderPreserved() throws InterruptedException {
+ Properties p = new Properties();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpGroup.get("note").add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+
+ int concurrency = 3;
+ final List<InterpreterResultMessage> results = new LinkedList<>();
+
+ Scheduler scheduler = intpA.getScheduler();
+ for (int i = 0; i < concurrency; i++) {
+ final String jobId = Integer.toString(i);
+ scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
+ private Object r;
+
+ @Override
+ public Object getReturn() {
+ return r;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.r = results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext(
+ "note",
+ jobId,
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+
+ synchronized (results) {
+ results.addAll(ret.message());
+ results.notify();
+ }
+ return null;
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ return false;
+ }
+
+ });
+ }
+
+ // wait for job finished
+ synchronized (results) {
+ while (results.size() != concurrency) {
+ results.wait(300);
+ }
+ }
+
+ int i = 0;
+ for (InterpreterResultMessage result : results) {
+ assertEquals(Integer.toString(i++), result.getData());
+ }
+ assertEquals(concurrency, i);
+
+ intpA.close();
+ }
+
+
+ @Test
+ public void testRunParallel() throws InterruptedException {
+ Properties p = new Properties();
+ p.put("parallel", "true");
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpGroup.get("note").add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+
+ int concurrency = 4;
+ final int timeToSleep = 1000;
+ final List<InterpreterResultMessage> results = new LinkedList<>();
+ long start = System.currentTimeMillis();
+
+ Scheduler scheduler = intpA.getScheduler();
+ for (int i = 0; i < concurrency; i++) {
+ final String jobId = Integer.toString(i);
+ scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
+ private Object r;
+
+ @Override
+ public Object getReturn() {
+ return r;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.r = results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ String stmt = Integer.toString(timeToSleep);
+ InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
+ "note",
+ jobId,
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+
+ synchronized (results) {
+ results.addAll(ret.message());
+ results.notify();
+ }
+ return stmt;
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ return false;
+ }
+
+ });
+ }
+
+ // wait for job finished
+ synchronized (results) {
+ while (results.size() != concurrency) {
+ results.wait(300);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+
+ assertTrue(end - start < timeToSleep * concurrency);
+
+ intpA.close();
+ }
+
+ @Test
+ public void testInterpreterGroupResetBeforeProcessStarts() {
+ Properties p = new Properties();
+
+ RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpA.setInterpreterGroup(intpGroup);
+ RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+
+ intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
+ RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
+
+ assertNotSame(processA.hashCode(), processB.hashCode());
+ }
+
+ @Test
+ public void testInterpreterGroupResetAfterProcessFinished() {
+ Properties p = new Properties();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpA.setInterpreterGroup(intpGroup);
+ RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+ intpA.open();
+
+ processA.dereference(); // intpA.close();
+
+ intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
+ RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
+
+ assertNotSame(processA.hashCode(), processB.hashCode());
+ }
+
+ @Test
+ public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException {
+ Properties p = new Properties();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpGroup.get("note").add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+
+ Job jobA = new Job("jobA", null) {
+ private Object r;
+
+ @Override
+ public Object getReturn() {
+ return r;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.r = results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ return intpA.interpret("2000",
+ new InterpreterContext(
+ "note",
+ "jobA",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ return false;
+ }
+
+ };
+ intpA.getScheduler().submit(jobA);
+
+ // wait for job started
+ while (intpA.getScheduler().getJobsRunning().size() == 0) {
+ Thread.sleep(100);
+ }
+
+ // restart interpreter
+ RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+ intpA.close();
+
+ InterpreterGroup newInterpreterGroup =
+ new InterpreterGroup(intpA.getInterpreterGroup().getId());
+ newInterpreterGroup.put("note", new LinkedList<Interpreter>());
+
+ intpA.setInterpreterGroup(newInterpreterGroup);
+ intpA.open();
+ RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
+
+ assertNotSame(processA.hashCode(), processB.hashCode());
+
+ }
+
+ @Test
+ public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
+ Properties p = new Properties();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ RemoteInterpreter intpA = createMockInterpreterA(p);
+
+ intpGroup.get("note").add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ RemoteInterpreter intpB = createMockInterpreterB(p);
+
+ intpGroup.get("note").add(intpB);
+ intpB.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+ intpB.open();
+
+ assertEquals(intpA.getScheduler(), intpB.getScheduler());
+ }
+
+ @Test
+ public void testMultiInterpreterSession() {
+ Properties p = new Properties();
+ intpGroup.put("sessionA", new LinkedList<Interpreter>());
+ intpGroup.put("sessionB", new LinkedList<Interpreter>());
+
+ RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
+ intpGroup.get("sessionA").add(intpAsessionA);
+ intpAsessionA.setInterpreterGroup(intpGroup);
+
+ RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
+ intpGroup.get("sessionA").add(intpBsessionA);
+ intpBsessionA.setInterpreterGroup(intpGroup);
+
+ intpAsessionA.open();
+ intpBsessionA.open();
+
+ assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
+
+ RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
+ intpGroup.get("sessionB").add(intpAsessionB);
+ intpAsessionB.setInterpreterGroup(intpGroup);
+
+ RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
+ intpGroup.get("sessionB").add(intpBsessionB);
+ intpBsessionB.setInterpreterGroup(intpGroup);
+
+ intpAsessionB.open();
+ intpBsessionB.open();
+
+ assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
+ assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
+ }
+
+ @Test
+ public void should_push_local_angular_repo_to_remote() throws Exception {
+ //Given
+ final Client client = Mockito.mock(Client.class);
+ final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
+ MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null,
+ null, "anonymous", false);
+ final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
+ registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
+ final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
+ interpreterGroup.setAngularObjectRegistry(registry);
+ intr.setInterpreterGroup(interpreterGroup);
+
+ final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+ Map<String, AngularObject>>>() {}.getType();
+ final Gson gson = new Gson();
+ final String expected = gson.toJson(registry.getRegistry(), registryType);
+
+ //When
+ intr.pushAngularObjectRegistryToRemote(client);
+
+ //Then
+ Mockito.verify(client).angularRegistryPush(expected);
+ }
+
+ @Test
+ public void testEnvStringPattern() {
+ assertFalse(RemoteInterpreterUtils.isEnvString(null));
+ assertFalse(RemoteInterpreterUtils.isEnvString(""));
+ assertFalse(RemoteInterpreterUtils.isEnvString("abcDEF"));
+ assertFalse(RemoteInterpreterUtils.isEnvString("ABC-DEF"));
+ assertTrue(RemoteInterpreterUtils.isEnvString("ABCDEF"));
+ assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF"));
+ assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF123"));
+ }
+
+ @Test
+ public void testEnvronmentAndPropertySet() {
+ Properties p = new Properties();
+ p.setProperty("MY_ENV1", "env value 1");
+ p.setProperty("my.property.1", "property value 1");
+
+ RemoteInterpreter intp = new RemoteInterpreter(
+ p,
+ "note",
+ MockInterpreterEnv.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ null,
+ null,
+ "anonymous",
+ false);
+
+ intpGroup.put("note", new LinkedList<Interpreter>());
+ intpGroup.get("note").add(intp);
+ intp.setInterpreterGroup(intpGroup);
+
+ intp.open();
+
+ InterpreterContext context = new InterpreterContext(
+ "noteId",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null);
+
+
+ assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData());
+ assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code());
+ assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code());
+ assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData());
+
+ intp.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
new file mode 100644
index 0000000..975d6ea
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.Test;
+
+public class RemoteInterpreterUtilsTest {
+
+ @Test
+ public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
+ assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
new file mode 100644
index 0000000..81a9164
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+public class MockInterpreterA extends Interpreter {
+
+ private String lastSt;
+
+ public MockInterpreterA(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ //new RuntimeException().printStackTrace();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public String getLastStatement() {
+ return lastSt;
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ try {
+ Thread.sleep(Long.parseLong(st));
+ this.lastSt = st;
+ } catch (NumberFormatException | InterruptedException e) {
+ throw new InterpreterException(e);
+ }
+ return new InterpreterResult(Code.SUCCESS, st);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ if (getProperty("parallel") != null && getProperty("parallel").equals("true")) {
+ return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10);
+ } else {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
new file mode 100644
index 0000000..d4b26ad
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+
+public class MockInterpreterAngular extends Interpreter {
+
+ AtomicInteger numWatch = new AtomicInteger(0);
+
+ public MockInterpreterAngular(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] stmt = st.split(" ");
+ String cmd = stmt[0];
+ String name = null;
+ if (stmt.length >= 2) {
+ name = stmt[1];
+ }
+ String value = null;
+ if (stmt.length == 3) {
+ value = stmt[2];
+ }
+
+ AngularObjectRegistry registry = context.getAngularObjectRegistry();
+
+ if (cmd.equals("add")) {
+ registry.add(name, value, context.getNoteId(), null);
+ registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher
+ (null) {
+
+ @Override
+ public void watch(Object oldObject, Object newObject,
+ InterpreterContext context) {
+ numWatch.incrementAndGet();
+ }
+
+ });
+ } else if (cmd.equalsIgnoreCase("update")) {
+ registry.get(name, context.getNoteId(), null).set(value);
+ } else if (cmd.equals("remove")) {
+ registry.remove(name, context.getNoteId(), null);
+ }
+
+ try {
+ Thread.sleep(500); // wait for watcher executed
+ } catch (InterruptedException e) {
+ logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e);
+ }
+
+ String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch
+ .get());
+ return new InterpreterResult(Code.SUCCESS, msg);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
new file mode 100644
index 0000000..7103335
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+
+public class MockInterpreterB extends Interpreter {
+
+ public MockInterpreterB(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ //new RuntimeException().printStackTrace();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ MockInterpreterA intpA = getInterpreterA();
+ String intpASt = intpA.getLastStatement();
+ long timeToSleep = Long.parseLong(st);
+ if (intpASt != null) {
+ timeToSleep += Long.parseLong(intpASt);
+ }
+ try {
+ Thread.sleep(timeToSleep);
+ } catch (NumberFormatException | InterruptedException e) {
+ throw new InterpreterException(e);
+ }
+ return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+
+ public MockInterpreterA getInterpreterA() {
+ InterpreterGroup interpreterGroup = getInterpreterGroup();
+ synchronized (interpreterGroup) {
+ for (List<Interpreter> interpreters : interpreterGroup.values()) {
+ boolean belongsToSameNoteGroup = false;
+ MockInterpreterA a = null;
+ for (Interpreter intp : interpreters) {
+ if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
+ Interpreter p = intp;
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ a = (MockInterpreterA) p;
+ }
+
+ Interpreter p = intp;
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ if (this == p) {
+ belongsToSameNoteGroup = true;
+ }
+ }
+ if (belongsToSameNoteGroup) {
+ return a;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ MockInterpreterA intpA = getInterpreterA();
+ if (intpA != null) {
+ return intpA.getScheduler();
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
new file mode 100644
index 0000000..12e11f7
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+
+public class MockInterpreterEnv extends Interpreter {
+
+ public MockInterpreterEnv(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] cmd = st.split(" ");
+ if (cmd[0].equals("getEnv")) {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1]));
+ } else if (cmd[0].equals("getProperty")){
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1]));
+ } else {
+ return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]);
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
new file mode 100644
index 0000000..349315c
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MockInterpreter to test outputstream
+ */
+public class MockInterpreterOutputStream extends Interpreter {
+ private String lastSt;
+
+ public MockInterpreterOutputStream(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ //new RuntimeException().printStackTrace();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public String getLastStatement() {
+ return lastSt;
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] ret = st.split(":");
+ try {
+ if (ret[1] != null) {
+ context.out.write(ret[1]);
+ }
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+ return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ?
+ ret[2] : "");
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
new file mode 100644
index 0000000..c4ff6ab
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+
+public class MockInterpreterResourcePool extends Interpreter {
+
+ AtomicInteger numWatch = new AtomicInteger(0);
+
+ public MockInterpreterResourcePool(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ String[] stmt = st.split(" ");
+ String cmd = stmt[0];
+ String noteId = null;
+ String paragraphId = null;
+ String name = null;
+ if (stmt.length >= 2) {
+ String[] npn = stmt[1].split(":");
+ if (npn.length >= 3) {
+ noteId = npn[0];
+ paragraphId = npn[1];
+ name = npn[2];
+ } else {
+ name = stmt[1];
+ }
+ }
+ String value = null;
+ if (stmt.length >= 3) {
+ value = stmt[2];
+ }
+
+ ResourcePool resourcePool = context.getResourcePool();
+ Object ret = null;
+ if (cmd.equals("put")) {
+ resourcePool.put(noteId, paragraphId, name, value);
+ } else if (cmd.equalsIgnoreCase("get")) {
+ Resource resource = resourcePool.get(noteId, paragraphId, name);
+ if (resource != null) {
+ ret = resourcePool.get(noteId, paragraphId, name).get();
+ } else {
+ ret = "";
+ }
+ } else if (cmd.equals("remove")) {
+ ret = resourcePool.remove(noteId, paragraphId, name);
+ } else if (cmd.equals("getAll")) {
+ ret = resourcePool.getAll();
+ } else if (cmd.equals("invoke")) {
+ Resource resource = resourcePool.get(noteId, paragraphId, name);
+ if (stmt.length >=4) {
+ Resource res = resource.invokeMethod(value, null, null, stmt[3]);
+ ret = res.get();
+ } else {
+ ret = resource.invokeMethod(value, null, null);
+ }
+ }
+
+ try {
+ Thread.sleep(500); // wait for watcher executed
+ } catch (InterruptedException e) {
+ }
+
+ Gson gson = new Gson();
+ return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
new file mode 100644
index 0000000..363ccf6
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unittest for DistributedResourcePool
+ */
+public class DistributedResourcePoolTest {
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+ private InterpreterGroup intpGroup1;
+ private InterpreterGroup intpGroup2;
+ private HashMap<String, String> env;
+ private RemoteInterpreter intp1;
+ private RemoteInterpreter intp2;
+ private InterpreterContext context;
+ private RemoteInterpreterEventPoller eventPoller1;
+ private RemoteInterpreterEventPoller eventPoller2;
+
+
+ @Before
+ public void setUp() throws Exception {
+ env = new HashMap<>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+ Properties p = new Properties();
+
+ intp1 = new RemoteInterpreter(
+ p,
+ "note",
+ MockInterpreterResourcePool.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ null,
+ null,
+ "anonymous",
+ false
+ );
+
+ intpGroup1 = new InterpreterGroup("intpGroup1");
+ intpGroup1.put("note", new LinkedList<Interpreter>());
+ intpGroup1.get("note").add(intp1);
+ intp1.setInterpreterGroup(intpGroup1);
+
+ intp2 = new RemoteInterpreter(
+ p,
+ "note",
+ MockInterpreterResourcePool.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ null,
+ null,
+ "anonymous",
+ false
+ );
+
+ intpGroup2 = new InterpreterGroup("intpGroup2");
+ intpGroup2.put("note", new LinkedList<Interpreter>());
+ intpGroup2.get("note").add(intp2);
+ intp2.setInterpreterGroup(intpGroup2);
+
+ context = new InterpreterContext(
+ "note",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ null,
+ null,
+ new LinkedList<InterpreterContextRunner>(),
+ null);
+
+ intp1.open();
+ intp2.open();
+
+ eventPoller1 = new RemoteInterpreterEventPoller(null, null);
+ eventPoller1.setInterpreterGroup(intpGroup1);
+ eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
+
+ eventPoller2 = new RemoteInterpreterEventPoller(null, null);
+ eventPoller2.setInterpreterGroup(intpGroup2);
+ eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
+
+ eventPoller1.start();
+ eventPoller2.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ eventPoller1.shutdown();
+ intp1.close();
+ intpGroup1.close();
+ eventPoller2.shutdown();
+ intp2.close();
+ intpGroup2.close();
+ }
+
+ @Test
+ public void testRemoteDistributedResourcePool() {
+ Gson gson = new Gson();
+ InterpreterResult ret;
+ intp1.interpret("put key1 value1", context);
+ intp2.interpret("put key2 value2", context);
+
+ ret = intp1.interpret("getAll", context);
+ assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+ ret = intp2.interpret("getAll", context);
+ assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+ ret = intp1.interpret("get key1", context);
+ assertEquals("value1", gson.fromJson(ret.message().get(0).getData(), String.class));
+
+ ret = intp1.interpret("get key2", context);
+ assertEquals("value2", gson.fromJson(ret.message().get(0).getData(), String.class));
+ }
+
+ @Test
+ public void testDistributedResourcePool() {
+ final LocalResourcePool pool2 = new LocalResourcePool("pool2");
+ final LocalResourcePool pool3 = new LocalResourcePool("pool3");
+
+ DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new ResourcePoolConnector() {
+ @Override
+ public ResourceSet getAllResources() {
+ ResourceSet set = pool2.getAll();
+ set.addAll(pool3.getAll());
+
+ ResourceSet remoteSet = new ResourceSet();
+ Gson gson = new Gson();
+ for (Resource s : set) {
+ RemoteResource remoteResource = gson.fromJson(gson.toJson(s), RemoteResource.class);
+ remoteResource.setResourcePoolConnector(this);
+ remoteSet.add(remoteResource);
+ }
+ return remoteSet;
+ }
+
+ @Override
+ public Object readResource(ResourceId id) {
+ if (id.getResourcePoolId().equals(pool2.id())) {
+ return pool2.get(id.getName()).get();
+ }
+ if (id.getResourcePoolId().equals(pool3.id())) {
+ return pool3.get(id.getName()).get();
+ }
+ return null;
+ }
+
+ @Override
+ public Object invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] params) {
+ return null;
+ }
+
+ @Override
+ public Resource invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[]
+ params, String returnResourceName) {
+ return null;
+ }
+ });
+
+ assertEquals(0, pool1.getAll().size());
+
+
+ // test get() can get from pool
+ pool2.put("object1", "value2");
+ assertEquals(1, pool1.getAll().size());
+ assertTrue(pool1.get("object1").isRemote());
+ assertEquals("value2", pool1.get("object1").get());
+
+ // test get() is locality aware
+ pool1.put("object1", "value1");
+ assertEquals(1, pool2.getAll().size());
+ assertEquals("value1", pool1.get("object1").get());
+
+ // test getAll() is locality aware
+ assertEquals("value1", pool1.getAll().get(0).get());
+ assertEquals("value2", pool1.getAll().get(1).get());
+ }
+
+ @Test
+ public void testResourcePoolUtils() {
+ Gson gson = new Gson();
+ InterpreterResult ret;
+
+ // when create some resources
+ intp1.interpret("put note1:paragraph1:key1 value1", context);
+ intp1.interpret("put note1:paragraph2:key1 value2", context);
+ intp2.interpret("put note2:paragraph1:key1 value1", context);
+ intp2.interpret("put note2:paragraph2:key2 value2", context);
+
+
+ // then get all resources.
+ assertEquals(4, ResourcePoolUtils.getAllResources().size());
+
+ // when remove all resources from note1
+ ResourcePoolUtils.removeResourcesBelongsToNote("note1");
+
+ // then resources should be removed.
+ assertEquals(2, ResourcePoolUtils.getAllResources().size());
+ assertEquals("", gson.fromJson(
+ intp1.interpret("get note1:paragraph1:key1", context).message().get(0).getData(),
+ String.class));
+ assertEquals("", gson.fromJson(
+ intp1.interpret("get note1:paragraph2:key1", context).message().get(0).getData(),
+ String.class));
+
+
+ // when remove all resources from note2:paragraph1
+ ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
+
+ // then 1
+ assertEquals(1, ResourcePoolUtils.getAllResources().size());
+ assertEquals("value2", gson.fromJson(
+ intp1.interpret("get note2:paragraph2:key2", context).message().get(0).getData(),
+ String.class));
+
+ }
+
+ @Test
+ public void testResourceInvokeMethod() {
+ Gson gson = new Gson();
+ InterpreterResult ret;
+ intp1.interpret("put key1 hey", context);
+ intp2.interpret("put key2 world", context);
+
+ // invoke method in local resource pool
+ ret = intp1.interpret("invoke key1 length", context);
+ assertEquals("3", ret.message().get(0).getData());
+
+ // invoke method in remote resource pool
+ ret = intp1.interpret("invoke key2 length", context);
+ assertEquals("5", ret.message().get(0).getData());
+
+ // make sure no resources are automatically created
+ ret = intp1.interpret("getAll", context);
+ assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+ // invoke method in local resource pool and save result
+ ret = intp1.interpret("invoke key1 length ret1", context);
+ assertEquals("3", ret.message().get(0).getData());
+
+ ret = intp1.interpret("getAll", context);
+ assertEquals(3, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+ ret = intp1.interpret("get ret1", context);
+ assertEquals("3", gson.fromJson(ret.message().get(0).getData(), String.class));
+
+ // invoke method in remote resource pool and save result
+ ret = intp1.interpret("invoke key2 length ret2", context);
+ assertEquals("5", ret.message().get(0).getData());
+
+ ret = intp1.interpret("getAll", context);
+ assertEquals(4, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+ ret = intp1.interpret("get ret2", context);
+ assertEquals("5", gson.fromJson(ret.message().get(0).getData(), String.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
new file mode 100644
index 0000000..ebb5100
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
+
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+ private SchedulerFactory schedulerSvc;
+ private static final int TICK_WAIT = 100;
+ private static final int MAX_WAIT_CYCLES = 100;
+
+ @Before
+ public void setUp() throws Exception{
+ schedulerSvc = new SchedulerFactory();
+ }
+
+ @After
+ public void tearDown(){
+
+ }
+
+ @Test
+ public void test() throws Exception {
+ Properties p = new Properties();
+ final InterpreterGroup intpGroup = new InterpreterGroup();
+ Map<String, String> env = new HashMap<>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+ final RemoteInterpreter intpA = new RemoteInterpreter(
+ p,
+ "note",
+ MockInterpreterA.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ this,
+ null,
+ "anonymous",
+ false);
+
+ intpGroup.put("note", new LinkedList<Interpreter>());
+ intpGroup.get("note").add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+
+ Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
+ intpA.getInterpreterProcess(),
+ 10);
+
+ Job job = new Job("jobId", "jobName", null, 200) {
+ Object results;
+ @Override
+ public Object getReturn() {
+ return results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ intpA.interpret("1000", new InterpreterContext(
+ "note",
+ "jobId",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null));
+ return "1000";
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ return false;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.results = results;
+ }
+ };
+ scheduler.submit(job);
+
+ int cycles = 0;
+ while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+ assertTrue(job.isRunning());
+
+ Thread.sleep(5*TICK_WAIT);
+ assertEquals(0, scheduler.getJobsWaiting().size());
+ assertEquals(1, scheduler.getJobsRunning().size());
+
+ cycles = 0;
+ while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+
+ assertTrue(job.isTerminated());
+ assertEquals(0, scheduler.getJobsWaiting().size());
+ assertEquals(0, scheduler.getJobsRunning().size());
+
+ intpA.close();
+ schedulerSvc.removeScheduler("test");
+ }
+
+ @Test
+ public void testAbortOnPending() throws Exception {
+ Properties p = new Properties();
+ final InterpreterGroup intpGroup = new InterpreterGroup();
+ Map<String, String> env = new HashMap<>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+ final RemoteInterpreter intpA = new RemoteInterpreter(
+ p,
+ "note",
+ MockInterpreterA.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ this,
+ null,
+ "anonymous",
+ false);
+
+ intpGroup.put("note", new LinkedList<Interpreter>());
+ intpGroup.get("note").add(intpA);
+ intpA.setInterpreterGroup(intpGroup);
+
+ intpA.open();
+
+ Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
+ intpA.getInterpreterProcess(),
+ 10);
+
+ Job job1 = new Job("jobId1", "jobName1", null, 200) {
+ Object results;
+ InterpreterContext context = new InterpreterContext(
+ "note",
+ "jobId1",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null);
+
+ @Override
+ public Object getReturn() {
+ return results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ intpA.interpret("1000", context);
+ return "1000";
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ if (isRunning()) {
+ intpA.cancel(context);
+ }
+ return true;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.results = results;
+ }
+ };
+
+ Job job2 = new Job("jobId2", "jobName2", null, 200) {
+ public Object results;
+ InterpreterContext context = new InterpreterContext(
+ "note",
+ "jobId2",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null);
+
+ @Override
+ public Object getReturn() {
+ return results;
+ }
+
+ @Override
+ public int progress() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ intpA.interpret("1000", context);
+ return "1000";
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ if (isRunning()) {
+ intpA.cancel(context);
+ }
+ return true;
+ }
+
+ @Override
+ public void setResult(Object results) {
+ this.results = results;
+ }
+ };
+
+ job2.setResult("result2");
+
+ scheduler.submit(job1);
+ scheduler.submit(job2);
+
+
+ int cycles = 0;
+ while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+ assertTrue(job1.isRunning());
+ assertTrue(job2.getStatus() == Status.PENDING);
+
+ job2.abort();
+
+ cycles = 0;
+ while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+ Thread.sleep(TICK_WAIT);
+ cycles++;
+ }
+
+ assertNotNull(job1.getDateFinished());
+ assertTrue(job1.isTerminated());
+ assertNull(job2.getDateFinished());
+ assertTrue(job2.isTerminated());
+ assertEquals("result2", job2.getReturn());
+
+ intpA.close();
+ schedulerSvc.removeScheduler("test");
+ }
+
+ @Override
+ public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
+
+ }
+
+ @Override
+ public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
+
+ }
+
+ @Override
+ public void onOutputClear(String noteId, String paragraphId) {
+
+ }
+
+ @Override
+ public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
+
+ }
+
+ @Override
+ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
+ if (callback != null) {
+ callback.onFinished(new LinkedList<>());
+ }
+ }
+
+ @Override
+ public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
+ }
+
+ @Override
+ public void onParaInfosReceived(String noteId, String paragraphId,
+ String interpreterSettingId, Map<String, String> metaInfos) {
+ }
+}
[2/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine
Posted by jo...@apache.org.
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
new file mode 100644
index 0000000..ed8982b
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.*;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Proxy for Interpreter instance that runs on separate process
+ */
+public class RemoteInterpreter extends Interpreter {
+ private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
+
+ private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+ private final ApplicationEventListener applicationEventListener;
+ private Gson gson = new Gson();
+ private String interpreterRunner;
+ private String interpreterPath;
+ private String localRepoPath;
+ private String className;
+ private String sessionKey;
+ private FormType formType;
+ private boolean initialized;
+ private Map<String, String> env;
+ private int connectTimeout;
+ private int maxPoolSize;
+ private String host;
+ private int port;
+ private String userName;
+ private Boolean isUserImpersonate;
+ private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+ private String interpreterGroupName;
+
+ /**
+ * Remote interpreter and manage interpreter process
+ */
+ public RemoteInterpreter(Properties property, String sessionKey, String className,
+ String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
+ int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+ ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+ int outputLimit, String interpreterGroupName) {
+ super(property);
+ this.sessionKey = sessionKey;
+ this.className = className;
+ initialized = false;
+ this.interpreterRunner = interpreterRunner;
+ this.interpreterPath = interpreterPath;
+ this.localRepoPath = localRepoPath;
+ env = getEnvFromInterpreterProperty(property);
+ this.connectTimeout = connectTimeout;
+ this.maxPoolSize = maxPoolSize;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ this.applicationEventListener = appListener;
+ this.userName = userName;
+ this.isUserImpersonate = isUserImpersonate;
+ this.outputLimit = outputLimit;
+ this.interpreterGroupName = interpreterGroupName;
+ }
+
+
+ /**
+ * Connect to existing process
+ */
+ public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
+ int port, String localRepoPath, int connectTimeout, int maxPoolSize,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+ ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+ int outputLimit) {
+ super(property);
+ this.sessionKey = sessionKey;
+ this.className = className;
+ initialized = false;
+ this.host = host;
+ this.port = port;
+ this.localRepoPath = localRepoPath;
+ this.connectTimeout = connectTimeout;
+ this.maxPoolSize = maxPoolSize;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ this.applicationEventListener = appListener;
+ this.userName = userName;
+ this.isUserImpersonate = isUserImpersonate;
+ this.outputLimit = outputLimit;
+ }
+
+
+ // VisibleForTesting
+ public RemoteInterpreter(Properties property, String sessionKey, String className,
+ String interpreterRunner, String interpreterPath, String localRepoPath,
+ Map<String, String> env, int connectTimeout,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+ ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
+ super(property);
+ this.className = className;
+ this.sessionKey = sessionKey;
+ this.interpreterRunner = interpreterRunner;
+ this.interpreterPath = interpreterPath;
+ this.localRepoPath = localRepoPath;
+ env.putAll(getEnvFromInterpreterProperty(property));
+ this.env = env;
+ this.connectTimeout = connectTimeout;
+ this.maxPoolSize = 10;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ this.applicationEventListener = appListener;
+ this.userName = userName;
+ this.isUserImpersonate = isUserImpersonate;
+ }
+
+ private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
+ Map<String, String> env = new HashMap<>();
+ for (Object key : property.keySet()) {
+ if (RemoteInterpreterUtils.isEnvString((String) key)) {
+ env.put((String) key, property.getProperty((String) key));
+ }
+ }
+ return env;
+ }
+
+ @Override
+ public String getClassName() {
+ return className;
+ }
+
+ private boolean connectToExistingProcess() {
+ return host != null && port > 0;
+ }
+
+ public RemoteInterpreterProcess getInterpreterProcess() {
+ InterpreterGroup intpGroup = getInterpreterGroup();
+ if (intpGroup == null) {
+ return null;
+ }
+
+ synchronized (intpGroup) {
+ if (intpGroup.getRemoteInterpreterProcess() == null) {
+ RemoteInterpreterProcess remoteProcess;
+ if (connectToExistingProcess()) {
+ remoteProcess = new RemoteInterpreterRunningProcess(
+ connectTimeout,
+ remoteInterpreterProcessListener,
+ applicationEventListener,
+ host,
+ port);
+ } else {
+ // create new remote process
+ remoteProcess = new RemoteInterpreterManagedProcess(
+ interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
+ remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
+ }
+
+ intpGroup.setRemoteInterpreterProcess(remoteProcess);
+ }
+
+ return intpGroup.getRemoteInterpreterProcess();
+ }
+ }
+
+ public synchronized void init() {
+ if (initialized == true) {
+ return;
+ }
+
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+
+ final InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+ interpreterProcess.setMaxPoolSize(
+ Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
+ String groupId = interpreterGroup.getId();
+
+ synchronized (interpreterProcess) {
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ logger.info("Create remote interpreter {}", getClassName());
+ if (localRepoPath != null) {
+ property.put("zeppelin.interpreter.localRepo", localRepoPath);
+ }
+
+ property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
+ client.createInterpreter(groupId, sessionKey,
+ getClassName(), (Map) property, userName);
+ // Push angular object loaded from JSON file to remote interpreter
+ if (!interpreterGroup.isAngularRegistryPushed()) {
+ pushAngularObjectRegistryToRemote(client);
+ interpreterGroup.setAngularRegistryPushed(true);
+ }
+
+ } catch (TException e) {
+ logger.error("Failed to create interpreter: {}", getClassName());
+ throw new InterpreterException(e);
+ } finally {
+ // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+ initialized = true;
+ }
+
+
+ @Override
+ public void open() {
+ InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+ synchronized (interpreterGroup) {
+ // initialize all interpreters in this interpreter group
+ List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+ // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+ // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+ // doesn't call open method, it's not open. It causes problem while running intp.close()
+ // In case of Spark, this method initializes all of interpreters and init() method increases
+ // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+ // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+ // But for now, we have to initialise all of interpreters for some reasons.
+ // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ if (!initialized) {
+ // reference per session
+ interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
+ }
+ for (Interpreter intp : new ArrayList<>(interpreters)) {
+ Interpreter p = intp;
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ try {
+ ((RemoteInterpreter) p).init();
+ } catch (InterpreterException e) {
+ logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+ p.getClassName());
+ interpreters.remove(p);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ InterpreterGroup interpreterGroup = getInterpreterGroup();
+ synchronized (interpreterGroup) {
+ // close all interpreters in this session
+ List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+ // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+ // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+ // doesn't call open method, it's not open. It causes problem while running intp.close()
+ // In case of Spark, this method initializes all of interpreters and init() method increases
+ // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+ // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+ // But for now, we have to initialise all of interpreters for some reasons.
+ // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+ if (initialized) {
+ // dereference per session
+ getInterpreterProcess().dereference();
+ }
+ for (Interpreter intp : new ArrayList<>(interpreters)) {
+ Interpreter p = intp;
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ try {
+ ((RemoteInterpreter) p).closeInterpreter();
+ } catch (InterpreterException e) {
+ logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+ p.getClassName());
+ interpreters.remove(p);
+ }
+ }
+ }
+ }
+
+ public void closeInterpreter() {
+ if (this.initialized == false) {
+ return;
+ }
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ boolean broken = false;
+ try {
+ client = interpreterProcess.getClient();
+ if (client != null) {
+ client.close(sessionKey, className);
+ }
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ } finally {
+ if (client != null) {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ this.initialized = false;
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("st:\n{}", st);
+ }
+
+ FormType form = getFormType();
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
+ .getInterpreterContextRunnerPool();
+
+ List<InterpreterContextRunner> runners = context.getRunners();
+ if (runners != null && runners.size() != 0) {
+ // assume all runners in this InterpreterContext have the same note id
+ String noteId = runners.get(0).getNoteId();
+
+ interpreterContextRunnerPool.clear(noteId);
+ interpreterContextRunnerPool.addAll(noteId, runners);
+ }
+
+ boolean broken = false;
+ try {
+
+ final GUI currentGUI = context.getGui();
+ RemoteInterpreterResult remoteResult = client.interpret(
+ sessionKey, className, st, convert(context));
+
+ Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
+ remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
+ }.getType());
+ context.getConfig().clear();
+ context.getConfig().putAll(remoteConfig);
+
+ if (form == FormType.NATIVE) {
+ GUI remoteGui = GUI.fromJson(remoteResult.getGui());
+ currentGUI.clear();
+ currentGUI.setParams(remoteGui.getParams());
+ currentGUI.setForms(remoteGui.getForms());
+ } else if (form == FormType.SIMPLE) {
+ final Map<String, Input> currentForms = currentGUI.getForms();
+ final Map<String, Object> currentParams = currentGUI.getParams();
+ final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
+ final Map<String, Input> remoteForms = remoteGUI.getForms();
+ final Map<String, Object> remoteParams = remoteGUI.getParams();
+ currentForms.putAll(remoteForms);
+ currentParams.putAll(remoteParams);
+ }
+
+ InterpreterResult result = convert(remoteResult);
+ return result;
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ client.cancel(sessionKey, className, convert(context));
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+ @Override
+ public FormType getFormType() {
+ open();
+
+ if (formType != null) {
+ return formType;
+ }
+
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ formType = FormType.valueOf(client.getFormType(sessionKey, className));
+ return formType;
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ if (interpreterProcess == null || !interpreterProcess.isRunning()) {
+ return 0;
+ }
+
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ return client.getProgress(sessionKey, className, convert(context));
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ List completion = client.completion(sessionKey, className, buf, cursor,
+ convert(interpreterContext));
+ return completion;
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ int maxConcurrency = maxPoolSize;
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ if (interpreterProcess == null) {
+ return null;
+ } else {
+ return SchedulerFactory.singleton().createOrGetRemoteScheduler(
+ RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
+ sessionKey, interpreterProcess, maxConcurrency);
+ }
+ }
+
+ private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
+ return interpreterGroup.getId();
+ }
+
+ private RemoteInterpreterContext convert(InterpreterContext ic) {
+ return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
+ ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
+ gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
+ }
+
+ private InterpreterResult convert(RemoteInterpreterResult result) {
+ InterpreterResult r = new InterpreterResult(
+ InterpreterResult.Code.valueOf(result.getCode()));
+
+ for (RemoteInterpreterResultMessage m : result.getMsg()) {
+ r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
+ }
+
+ return r;
+ }
+
+ /**
+ * Push local angular object registry to
+ * remote interpreter. This method should be
+ * call ONLY inside the init() method
+ */
+ void pushAngularObjectRegistryToRemote(Client client) throws TException {
+ final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
+ .getAngularObjectRegistry();
+
+ if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
+ final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
+ .getRegistry();
+
+ logger.info("Push local angular object registry from ZeppelinServer to" +
+ " remote interpreter group {}", this.getInterpreterGroup().getId());
+
+ final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+ Map<String, AngularObject>>>() {
+ }.getType();
+
+ Gson gson = new Gson();
+ client.angularRegistryPush(gson.toJson(registry, registryType));
+ }
+ }
+
+ public Map<String, String> getEnv() {
+ return env;
+ }
+
+ public void setEnv(Map<String, String> env) {
+ this.env = env;
+ }
+
+ public void addEnv(Map<String, String> env) {
+ if (this.env == null) {
+ this.env = new HashMap<>();
+ }
+ this.env.putAll(env);
+ }
+
+ //Only for test
+ public String getInterpreterRunner() {
+ return interpreterRunner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
new file mode 100644
index 0000000..1fb9b90
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.commons.exec.*;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * This class manages start / stop of remote interpreter process
+ */
+public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
+ implements ExecuteResultHandler {
+ private static final Logger logger = LoggerFactory.getLogger(
+ RemoteInterpreterManagedProcess.class);
+ private final String interpreterRunner;
+
+ private DefaultExecutor executor;
+ private ExecuteWatchdog watchdog;
+ boolean running = false;
+ private int port = -1;
+ private final String interpreterDir;
+ private final String localRepoDir;
+ private final String interpreterGroupName;
+
+ private Map<String, String> env;
+
+ public RemoteInterpreterManagedProcess(
+ String intpRunner,
+ String intpDir,
+ String localRepoDir,
+ Map<String, String> env,
+ int connectTimeout,
+ RemoteInterpreterProcessListener listener,
+ ApplicationEventListener appListener,
+ String interpreterGroupName) {
+ super(new RemoteInterpreterEventPoller(listener, appListener),
+ connectTimeout);
+ this.interpreterRunner = intpRunner;
+ this.env = env;
+ this.interpreterDir = intpDir;
+ this.localRepoDir = localRepoDir;
+ this.interpreterGroupName = interpreterGroupName;
+ }
+
+ RemoteInterpreterManagedProcess(String intpRunner,
+ String intpDir,
+ String localRepoDir,
+ Map<String, String> env,
+ RemoteInterpreterEventPoller remoteInterpreterEventPoller,
+ int connectTimeout,
+ String interpreterGroupName) {
+ super(remoteInterpreterEventPoller,
+ connectTimeout);
+ this.interpreterRunner = intpRunner;
+ this.env = env;
+ this.interpreterDir = intpDir;
+ this.localRepoDir = localRepoDir;
+ this.interpreterGroupName = interpreterGroupName;
+ }
+
+ @Override
+ public String getHost() {
+ return "localhost";
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public void start(String userName, Boolean isUserImpersonate) {
+ // start server process
+ try {
+ port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ } catch (IOException e1) {
+ throw new InterpreterException(e1);
+ }
+
+ CommandLine cmdLine = CommandLine.parse(interpreterRunner);
+ cmdLine.addArgument("-d", false);
+ cmdLine.addArgument(interpreterDir, false);
+ cmdLine.addArgument("-p", false);
+ cmdLine.addArgument(Integer.toString(port), false);
+ if (isUserImpersonate && !userName.equals("anonymous")) {
+ cmdLine.addArgument("-u", false);
+ cmdLine.addArgument(userName, false);
+ }
+ cmdLine.addArgument("-l", false);
+ cmdLine.addArgument(localRepoDir, false);
+ cmdLine.addArgument("-g", false);
+ cmdLine.addArgument(interpreterGroupName, false);
+
+ executor = new DefaultExecutor();
+
+ ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
+ ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
+ processOutput.setOutputStream(cmdOut);
+
+ executor.setStreamHandler(new PumpStreamHandler(processOutput));
+ watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+ executor.setWatchdog(watchdog);
+
+ try {
+ Map procEnv = EnvironmentUtils.getProcEnvironment();
+ procEnv.putAll(env);
+
+ logger.info("Run interpreter process {}", cmdLine);
+ executor.execute(cmdLine, procEnv, this);
+ running = true;
+ } catch (IOException e) {
+ running = false;
+ throw new InterpreterException(e);
+ }
+
+
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
+ if (!running) {
+ try {
+ cmdOut.flush();
+ } catch (IOException e) {
+ // nothing to do
+ }
+ throw new InterpreterException(new String(cmdOut.toByteArray()));
+ }
+
+ try {
+ if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
+ break;
+ } else {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
+ "Thread.sleep", e);
+ }
+ }
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Remote interpreter not yet accessible at localhost:" + port);
+ }
+ }
+ }
+ processOutput.setOutputStream(null);
+ }
+
+ public void stop() {
+ if (isRunning()) {
+ logger.info("kill interpreter process");
+ watchdog.destroyProcess();
+ }
+
+ executor = null;
+ watchdog = null;
+ running = false;
+ logger.info("Remote process terminated");
+ }
+
+ @Override
+ public void onProcessComplete(int exitValue) {
+ logger.info("Interpreter process exited {}", exitValue);
+ running = false;
+
+ }
+
+ @Override
+ public void onProcessFailed(ExecuteException e) {
+ logger.info("Interpreter process failed {}", e);
+ running = false;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ private static class ProcessLogOutputStream extends LogOutputStream {
+
+ private Logger logger;
+ OutputStream out;
+
+ public ProcessLogOutputStream(Logger logger) {
+ this.logger = logger;
+ }
+
+ @Override
+ protected void processLine(String s, int i) {
+ this.logger.debug(s);
+ }
+
+ @Override
+ public void write(byte [] b) throws IOException {
+ super.write(b);
+
+ if (out != null) {
+ synchronized (this) {
+ if (out != null) {
+ out.write(b);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void write(byte [] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+
+ if (out != null) {
+ synchronized (this) {
+ if (out != null) {
+ out.write(b, offset, len);
+ }
+ }
+ }
+ }
+
+ public void setOutputStream(OutputStream out) {
+ synchronized (this) {
+ this.out = out;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
new file mode 100644
index 0000000..bb176be
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class connects to existing process
+ */
+public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
+ private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
+ private final String host;
+ private final int port;
+
+ public RemoteInterpreterRunningProcess(
+ int connectTimeout,
+ RemoteInterpreterProcessListener listener,
+ ApplicationEventListener appListener,
+ String host,
+ int port
+ ) {
+ super(connectTimeout, listener, appListener);
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public void start(String userName, Boolean isUserImpersonate) {
+ // assume process is externally managed. nothing to do
+ }
+
+ @Override
+ public void stop() {
+ // assume process is externally managed. nothing to do
+ }
+
+ @Override
+ public boolean isRunning() {
+ return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
new file mode 100644
index 0000000..c8c64ea
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class AppendOutputRunnerTest {
+
+ private static final int NUM_EVENTS = 10000;
+ private static final int NUM_CLUBBED_EVENTS = 100;
+ private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+ private static ScheduledFuture<?> future = null;
+ /* It is being accessed by multiple threads.
+ * While loop for 'loopForBufferCompletion' could
+ * run for-ever.
+ */
+ private volatile static int numInvocations = 0;
+
+ @After
+ public void afterEach() {
+ if (future != null) {
+ future.cancel(true);
+ }
+ }
+
+ @Test
+ public void testSingleEvent() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ String[][] buffer = {{"note", "para", "data\n"}};
+
+ loopForCompletingEvents(listener, 1, buffer);
+ verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n");
+ }
+
+ @Test
+ public void testMultipleEventsOfSameParagraph() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ String note1 = "note1";
+ String para1 = "para1";
+ String[][] buffer = {
+ {note1, para1, "data1\n"},
+ {note1, para1, "data2\n"},
+ {note1, para1, "data3\n"}
+ };
+
+ loopForCompletingEvents(listener, 1, buffer);
+ verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
+ }
+
+ @Test
+ public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ String note1 = "note1";
+ String note2 = "note2";
+ String para1 = "para1";
+ String para2 = "para2";
+ String[][] buffer = {
+ {note1, para1, "data1\n"},
+ {note1, para2, "data2\n"},
+ {note2, para1, "data3\n"},
+ {note2, para2, "data4\n"}
+ };
+ loopForCompletingEvents(listener, 4, buffer);
+
+ verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n");
+ verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n");
+ verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n");
+ verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n");
+ }
+
+ @Test
+ public void testClubbedData() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ future = service.scheduleWithFixedDelay(runner, 0,
+ AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+ Thread thread = new Thread(new BombardEvents(runner));
+ thread.start();
+ thread.join();
+ Thread.sleep(1000);
+
+ /* NUM_CLUBBED_EVENTS is a heuristic number.
+ * It has been observed that for 10,000 continuos event
+ * calls, 30-40 Web-socket calls are made. Keeping
+ * the unit-test to a pessimistic 100 web-socket calls.
+ */
+ verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ }
+
+ @Test
+ public void testWarnLoggerForLargeData() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ String data = "data\n";
+ int numEvents = 100000;
+
+ for (int i=0; i<numEvents; i++) {
+ runner.appendBuffer("noteId", "paraId", 0, data);
+ }
+
+ TestAppender appender = new TestAppender();
+ Logger logger = Logger.getRootLogger();
+ logger.addAppender(appender);
+ Logger.getLogger(RemoteInterpreterEventPoller.class);
+
+ runner.run();
+ List<LoggingEvent> log;
+
+ int warnLogCounter;
+ LoggingEvent sizeWarnLogEntry = null;
+ do {
+ warnLogCounter = 0;
+ log = appender.getLog();
+ for (LoggingEvent logEntry: log) {
+ if (Level.WARN.equals(logEntry.getLevel())) {
+ sizeWarnLogEntry = logEntry;
+ warnLogCounter += 1;
+ }
+ }
+ } while(warnLogCounter != 2);
+
+ String loggerString = "Processing size for buffered append-output is high: " +
+ (data.length() * numEvents) + " characters.";
+ assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
+ }
+
+ private class BombardEvents implements Runnable {
+
+ private final AppendOutputRunner runner;
+
+ private BombardEvents(AppendOutputRunner runner) {
+ this.runner = runner;
+ }
+
+ @Override
+ public void run() {
+ String noteId = "noteId";
+ String paraId = "paraId";
+ for (int i=0; i<NUM_EVENTS; i++) {
+ runner.appendBuffer(noteId, paraId, 0, "data\n");
+ }
+ }
+ }
+
+ private class TestAppender extends AppenderSkeleton {
+ private final List<LoggingEvent> log = new ArrayList<>();
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ protected void append(final LoggingEvent loggingEvent) {
+ log.add(loggingEvent);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public List<LoggingEvent> getLog() {
+ return new ArrayList<>(log);
+ }
+ }
+
+ private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ numInvocations += 1;
+ return null;
+ }
+ }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+ }
+
+ private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
+ int numTimes, String[][] buffer) {
+ numInvocations = 0;
+ prepareInvocationCounts(listener);
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ for (String[] bufferElement: buffer) {
+ runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
+ }
+ future = service.scheduleWithFixedDelay(runner, 0,
+ AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+ long startTimeMs = System.currentTimeMillis();
+ while(numInvocations != numTimes) {
+ if (System.currentTimeMillis() - startTimeMs > 2000) {
+ fail("Buffered events were not sent for 2 seconds");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
new file mode 100644
index 0000000..f7404e3
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.display.*;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+
+ private InterpreterGroup intpGroup;
+ private HashMap<String, String> env;
+ private RemoteInterpreter intp;
+ private InterpreterContext context;
+ private RemoteAngularObjectRegistry localRegistry;
+
+ private AtomicInteger onAdd;
+ private AtomicInteger onUpdate;
+ private AtomicInteger onRemove;
+
+ @Before
+ public void setUp() throws Exception {
+ onAdd = new AtomicInteger(0);
+ onUpdate = new AtomicInteger(0);
+ onRemove = new AtomicInteger(0);
+
+ intpGroup = new InterpreterGroup("intpId");
+ localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup);
+ intpGroup.setAngularObjectRegistry(localRegistry);
+ env = new HashMap<>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+ Properties p = new Properties();
+
+ intp = new RemoteInterpreter(
+ p,
+ "note",
+ MockInterpreterAngular.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ null,
+ null,
+ "anonymous",
+ false
+ );
+
+ intpGroup.put("note", new LinkedList<Interpreter>());
+ intpGroup.get("note").add(intp);
+ intp.setInterpreterGroup(intpGroup);
+
+ context = new InterpreterContext(
+ "note",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("pool1"),
+ new LinkedList<InterpreterContextRunner>(), null);
+
+ intp.open();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ intp.close();
+ intpGroup.close();
+ }
+
+ @Test
+ public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+ assertEquals("0", result[1]); // num watcher called
+
+ // create object
+ ret = intp.interpret("add n1 v1", context);
+ Thread.sleep(500);
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("0", result[1]); // num watcher called
+ assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+ // update object
+ ret = intp.interpret("update n1 v11", context);
+ result = ret.message().get(0).getData().split(" ");
+ Thread.sleep(500);
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("1", result[1]); // num watcher called
+ assertEquals("v11", localRegistry.get("n1", "note", null).get());
+
+ // remove object
+ ret = intp.interpret("remove n1", context);
+ result = ret.message().get(0).getData().split(" ");
+ Thread.sleep(500);
+ assertEquals("0", result[0]); // size of registry
+ assertEquals("1", result[1]); // num watcher called
+ assertEquals(null, localRegistry.get("n1", "note", null));
+ }
+
+ @Test
+ public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
+ // test if angularobject removal from server side propagate to interpreter process's registry.
+ // will happen when notebook is removed.
+
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+
+ // create object
+ ret = intp.interpret("add n1 v1", context);
+ Thread.sleep(500);
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+ // remove object in local registry.
+ localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
+ ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+ }
+
+ @Test
+ public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
+ // test if angularobject add from server side propagate to interpreter process's registry.
+ // will happen when zeppelin server loads notebook and restore the object into registry
+
+ InterpreterResult ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ String[] result = ret.message().get(0).getData().split(" ");
+ assertEquals("0", result[0]); // size of registry
+
+ // create object
+ localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
+
+ // get from remote registry
+ ret = intp.interpret("get", context);
+ Thread.sleep(500); // waitFor eventpoller pool event
+ result = ret.message().get(0).getData().split(" ");
+ assertEquals("1", result[0]); // size of registry
+ }
+
+ @Override
+ public void onAdd(String interpreterGroupId, AngularObject object) {
+ onAdd.incrementAndGet();
+ }
+
+ @Override
+ public void onUpdate(String interpreterGroupId, AngularObject object) {
+ onUpdate.incrementAndGet();
+ }
+
+ @Override
+ public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
+ onRemove.incrementAndGet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
new file mode 100644
index 0000000..49aa7aa
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.junit.Test;
+
+import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteInterpreterEventPollerTest {
+
+ @Test
+ public void shouldClearUnreadEventsOnShutdown() throws Exception {
+ RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
+ RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
+
+ eventPoller.setInterpreterProcess(interpreterProc);
+ eventPoller.shutdown();
+ eventPoller.start();
+ eventPoller.join();
+
+ assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
+ }
+
+ private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
+ RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
+ RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
+ RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
+ RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
+
+ when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
+ when(intProc.getClient()).thenReturn(client);
+
+ return intProc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
new file mode 100644
index 0000000..3f865cb
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test for remote interpreter output stream
+ */
+public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+ private InterpreterGroup intpGroup;
+ private HashMap<String, String> env;
+
+ @Before
+ public void setUp() throws Exception {
+ intpGroup = new InterpreterGroup();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ env = new HashMap<>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ intpGroup.close();
+ }
+
+ private RemoteInterpreter createMockInterpreter() {
+ RemoteInterpreter intp = new RemoteInterpreter(
+ new Properties(),
+ "note",
+ MockInterpreterOutputStream.class.getName(),
+ new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+ "fake",
+ "fakeRepo",
+ env,
+ 10 * 1000,
+ this,
+ null,
+ "anonymous",
+ false);
+
+ intpGroup.get("note").add(intp);
+ intp.setInterpreterGroup(intpGroup);
+ return intp;
+ }
+
+ private InterpreterContext createInterpreterContext() {
+ return new InterpreterContext(
+ "noteId",
+ "id",
+ null,
+ "title",
+ "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ null,
+ new LinkedList<InterpreterContextRunner>(), null);
+ }
+
+ @Test
+ public void testInterpreterResultOnly() {
+ RemoteInterpreter intp = createMockInterpreter();
+ InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("staticresult", ret.message().get(0).getData());
+
+ ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("staticresult2", ret.message().get(0).getData());
+
+ ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals("staticresult3", ret.message().get(0).getData());
+ }
+
+ @Test
+ public void testInterpreterOutputStreamOnly() {
+ RemoteInterpreter intp = createMockInterpreter();
+ InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("streamresult", ret.message().get(0).getData());
+
+ ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals("streamresult2", ret.message().get(0).getData());
+ }
+
+ @Test
+ public void testInterpreterResultOutputStreamMixed() {
+ RemoteInterpreter intp = createMockInterpreter();
+ InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals("stream", ret.message().get(0).getData());
+ assertEquals("static", ret.message().get(1).getData());
+ }
+
+ @Test
+ public void testOutputType() {
+ RemoteInterpreter intp = createMockInterpreter();
+
+ InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+ assertEquals("hello", ret.message().get(0).getData());
+
+ ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+ assertEquals("hello", ret.message().get(0).getData());
+
+ ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
+ assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+ assertEquals("hello", ret.message().get(0).getData());
+ assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType());
+ assertEquals("world", ret.message().get(1).getData());
+ }
+
+ @Override
+ public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
+
+ }
+
+ @Override
+ public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
+
+ }
+
+ @Override
+ public void onOutputClear(String noteId, String paragraphId) {
+
+ }
+
+ @Override
+ public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
+
+ }
+
+ @Override
+ public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
+ if (callback != null) {
+ callback.onFinished(new LinkedList<>());
+ }
+ }
+
+ @Override
+ public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
+
+ }
+
+ @Override
+ public void onParaInfosReceived(String noteId, String paragraphId,
+ String interpreterSettingId, Map<String, String> metaInfos) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
new file mode 100644
index 0000000..b85d7ef
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.interpreter.Constants;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.junit.Test;
+
+public class RemoteInterpreterProcessTest {
+ private static final String INTERPRETER_SCRIPT =
+ System.getProperty("os.name").startsWith("Windows") ?
+ "../bin/interpreter.cmd" :
+ "../bin/interpreter.sh";
+ private static final int DUMMY_PORT=3678;
+
+ @Test
+ public void testStartStop() {
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+ INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+ 10 * 1000, null, null,"fakeName");
+ assertFalse(rip.isRunning());
+ assertEquals(0, rip.referenceCount());
+ assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+ assertEquals(2, rip.reference(intpGroup, "anonymous", false));
+ assertEquals(true, rip.isRunning());
+ assertEquals(1, rip.dereference());
+ assertEquals(true, rip.isRunning());
+ assertEquals(0, rip.dereference());
+ assertEquals(false, rip.isRunning());
+ }
+
+ @Test
+ public void testClientFactory() throws Exception {
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+ INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+ mock(RemoteInterpreterEventPoller.class), 10 * 1000, "fakeName");
+ rip.reference(intpGroup, "anonymous", false);
+ assertEquals(0, rip.getNumActiveClient());
+ assertEquals(0, rip.getNumIdleClient());
+
+ Client client = rip.getClient();
+ assertEquals(1, rip.getNumActiveClient());
+ assertEquals(0, rip.getNumIdleClient());
+
+ rip.releaseClient(client);
+ assertEquals(0, rip.getNumActiveClient());
+ assertEquals(1, rip.getNumIdleClient());
+
+ rip.dereference();
+ }
+
+ @Test
+ public void testStartStopRemoteInterpreter() throws TException, InterruptedException {
+ RemoteInterpreterServer server = new RemoteInterpreterServer(3678);
+ server.start();
+ boolean running = false;
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < 10 * 1000) {
+ if (server.isRunning()) {
+ running = true;
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ }
+ Properties properties = new Properties();
+ properties.setProperty(Constants.ZEPPELIN_INTERPRETER_PORT, "3678");
+ properties.setProperty(Constants.ZEPPELIN_INTERPRETER_HOST, "localhost");
+ InterpreterGroup intpGroup = mock(InterpreterGroup.class);
+ when(intpGroup.getProperty()).thenReturn(properties);
+ when(intpGroup.containsKey(Constants.EXISTING_PROCESS)).thenReturn(true);
+
+ RemoteInterpreterProcess rip = new RemoteInterpreterManagedProcess(
+ INTERPRETER_SCRIPT,
+ "nonexists",
+ "fakeRepo",
+ new HashMap<String, String>(),
+ mock(RemoteInterpreterEventPoller.class)
+ , 10 * 1000,
+ "fakeName");
+ assertFalse(rip.isRunning());
+ assertEquals(0, rip.referenceCount());
+ assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+ assertEquals(true, rip.isRunning());
+ }
+
+
+ @Test
+ public void testPropagateError() throws TException, InterruptedException {
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+ "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
+ 10 * 1000, null, null, "fakeName");
+ assertFalse(rip.isRunning());
+ assertEquals(0, rip.referenceCount());
+ try {
+ assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+ } catch (InterpreterException e) {
+ e.getMessage().contains("hello_world");
+ }
+ assertEquals(0, rip.referenceCount());
+ }
+}
[4/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine
Posted by jo...@apache.org.
[MINOR] Move remoteinterpreter into zengine
### What is this PR for?
RemoteInterpreter is only used in the server side then zeppelin-interpreter doesn't have to include this class. Moving this class helps to reduce interpreter binary size and change RemoteInterpreter without adding more dependencies if we want
### What type of PR is it?
[Refactoring]
### Todos
* [x] - Move RemoteInterpreter and related files out of zeppelin-interpreter module
### What is the Jira issue?
N/A
### How should this be tested?
N/A
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jongyoul Lee <jo...@gmail.com>
Closes #2320 from jongyoul/minor/move-remoteinterpreter-into-zengine and squashes the following commits:
80979913c [Jongyoul Lee] Removed author tag
e1425dfa8 [Jongyoul Lee] Adopted DummyInterpreter
99c093229 [Jongyoul Lee] Made DummyInterpreter
5ac8dfbbd [Jongyoul Lee] Moved RemoteInterpreterServer to zeppelin-interpreter
0a881c1b3 [Jongyoul Lee] Removed unused package imported Removed unnecessary classes imported
b7e0b9436 [Jongyoul Lee] moved some files related remote interpreter and fix some minor things
7e8721592 [Jongyoul Lee] move some files of remote packages from zeppelin-interpreter to zeppelin-zengine
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d9c4a5f0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d9c4a5f0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d9c4a5f0
Branch: refs/heads/master
Commit: d9c4a5f0b6c3355753b50f466199cbe551cbd89a
Parents: 8e96d8b
Author: Jongyoul Lee <jo...@gmail.com>
Authored: Sat May 6 02:28:31 2017 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Sat May 6 22:46:36 2017 +0900
----------------------------------------------------------------------
.../remote/RemoteAngularObjectRegistry.java | 133 ---
.../interpreter/remote/RemoteInterpreter.java | 585 ------------
.../remote/RemoteInterpreterManagedProcess.java | 247 -----
.../remote/RemoteInterpreterRunningProcess.java | 67 --
.../remote/RemoteInterpreterServer.java | 2 +-
.../remote/RemoteInterpreterUtils.java | 9 +-
.../zeppelin/resource/ResourcePoolUtils.java | 1 -
.../zeppelin/scheduler/RemoteScheduler.java | 1 -
.../zeppelin/interpreter/DummyInterpreter.java | 43 +
.../zeppelin/interpreter/InterpreterTest.java | 7 +-
.../remote/AppendOutputRunnerTest.java | 236 -----
.../remote/RemoteAngularObjectTest.java | 201 ----
.../RemoteInterpreterEventPollerTest.java | 55 --
.../RemoteInterpreterOutputTestStream.java | 191 ----
.../remote/RemoteInterpreterProcessTest.java | 131 ---
.../remote/RemoteInterpreterTest.java | 914 -------------------
.../remote/RemoteInterpreterUtilsTest.java | 34 -
.../remote/mock/MockInterpreterA.java | 94 --
.../remote/mock/MockInterpreterAngular.java | 113 ---
.../remote/mock/MockInterpreterB.java | 126 ---
.../remote/mock/MockInterpreterEnv.java | 80 --
.../mock/MockInterpreterOutputStream.java | 90 --
.../mock/MockInterpreterResourcePool.java | 128 ---
.../resource/DistributedResourcePoolTest.java | 303 ------
.../zeppelin/scheduler/RemoteSchedulerTest.java | 364 --------
.../remote/RemoteAngularObjectRegistry.java | 133 +++
.../interpreter/remote/RemoteInterpreter.java | 577 ++++++++++++
.../remote/RemoteInterpreterManagedProcess.java | 247 +++++
.../remote/RemoteInterpreterRunningProcess.java | 67 ++
.../remote/AppendOutputRunnerTest.java | 236 +++++
.../remote/RemoteAngularObjectTest.java | 201 ++++
.../RemoteInterpreterEventPollerTest.java | 55 ++
.../RemoteInterpreterOutputTestStream.java | 191 ++++
.../remote/RemoteInterpreterProcessTest.java | 131 +++
.../remote/RemoteInterpreterTest.java | 914 +++++++++++++++++++
.../remote/RemoteInterpreterUtilsTest.java | 34 +
.../remote/mock/MockInterpreterA.java | 94 ++
.../remote/mock/MockInterpreterAngular.java | 113 +++
.../remote/mock/MockInterpreterB.java | 126 +++
.../remote/mock/MockInterpreterEnv.java | 80 ++
.../mock/MockInterpreterOutputStream.java | 90 ++
.../mock/MockInterpreterResourcePool.java | 128 +++
.../resource/DistributedResourcePoolTest.java | 303 ++++++
.../zeppelin/scheduler/RemoteSchedulerTest.java | 364 ++++++++
44 files changed, 4139 insertions(+), 4100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
deleted file mode 100644
index 0ac7116..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.List;
-
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-
-/**
- * Proxy for AngularObjectRegistry that exists in remote interpreter process
- */
-public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
- Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
- private InterpreterGroup interpreterGroup;
-
- public RemoteAngularObjectRegistry(String interpreterId,
- AngularObjectRegistryListener listener,
- InterpreterGroup interpreterGroup) {
- super(interpreterId, listener);
- this.interpreterGroup = interpreterGroup;
- }
-
- private RemoteInterpreterProcess getRemoteInterpreterProcess() {
- return interpreterGroup.getRemoteInterpreterProcess();
- }
-
- /**
- * When ZeppelinServer side code want to add angularObject to the registry,
- * this method should be used instead of add()
- * @param name
- * @param o
- * @param noteId
- * @return
- */
- public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String
- paragraphId) {
- Gson gson = new Gson();
- RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
- if (!remoteInterpreterProcess.isRunning()) {
- return super.add(name, o, noteId, paragraphId, true);
- }
-
- Client client = null;
- boolean broken = false;
- try {
- client = remoteInterpreterProcess.getClient();
- client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
- return super.add(name, o, noteId, paragraphId, true);
- } catch (TException e) {
- broken = true;
- logger.error("Error", e);
- } catch (Exception e) {
- logger.error("Error", e);
- } finally {
- if (client != null) {
- remoteInterpreterProcess.releaseClient(client, broken);
- }
- }
- return null;
- }
-
- /**
- * When ZeppelinServer side code want to remove angularObject from the registry,
- * this method should be used instead of remove()
- * @param name
- * @param noteId
- * @param paragraphId
- * @return
- */
- public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String
- paragraphId) {
- RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
- if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
- return super.remove(name, noteId, paragraphId);
- }
-
- Client client = null;
- boolean broken = false;
- try {
- client = remoteInterpreterProcess.getClient();
- client.angularObjectRemove(name, noteId, paragraphId);
- return super.remove(name, noteId, paragraphId);
- } catch (TException e) {
- broken = true;
- logger.error("Error", e);
- } catch (Exception e) {
- logger.error("Error", e);
- } finally {
- if (client != null) {
- remoteInterpreterProcess.releaseClient(client, broken);
- }
- }
- return null;
- }
-
- public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
- List<AngularObject> all = getAll(noteId, paragraphId);
- for (AngularObject ao : all) {
- removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId);
- }
- }
-
- @Override
- protected AngularObject createNewAngularObject(String name, Object o, String noteId, String
- paragraphId) {
- return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup,
- getAngularObjectListener());
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
deleted file mode 100644
index 123ad75..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ /dev/null
@@ -1,585 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.*;
-
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-/**
- * Proxy for Interpreter instance that runs on separate process
- */
-public class RemoteInterpreter extends Interpreter {
- private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
-
- private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
- private final ApplicationEventListener applicationEventListener;
- private Gson gson = new Gson();
- private String interpreterRunner;
- private String interpreterPath;
- private String localRepoPath;
- private String className;
- private String sessionKey;
- private FormType formType;
- private boolean initialized;
- private Map<String, String> env;
- private int connectTimeout;
- private int maxPoolSize;
- private String host;
- private int port;
- private String userName;
- private Boolean isUserImpersonate;
- private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
- private String interpreterGroupName;
-
- /**
- * Remote interpreter and manage interpreter process
- */
- public RemoteInterpreter(Properties property, String sessionKey, String className,
- String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
- int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
- int outputLimit, String interpreterGroupName) {
- super(property);
- this.sessionKey = sessionKey;
- this.className = className;
- initialized = false;
- this.interpreterRunner = interpreterRunner;
- this.interpreterPath = interpreterPath;
- this.localRepoPath = localRepoPath;
- env = getEnvFromInterpreterProperty(property);
- this.connectTimeout = connectTimeout;
- this.maxPoolSize = maxPoolSize;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.applicationEventListener = appListener;
- this.userName = userName;
- this.isUserImpersonate = isUserImpersonate;
- this.outputLimit = outputLimit;
- this.interpreterGroupName = interpreterGroupName;
- }
-
-
- /**
- * Connect to existing process
- */
- public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
- int port, String localRepoPath, int connectTimeout, int maxPoolSize,
- RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
- int outputLimit) {
- super(property);
- this.sessionKey = sessionKey;
- this.className = className;
- initialized = false;
- this.host = host;
- this.port = port;
- this.localRepoPath = localRepoPath;
- this.connectTimeout = connectTimeout;
- this.maxPoolSize = maxPoolSize;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.applicationEventListener = appListener;
- this.userName = userName;
- this.isUserImpersonate = isUserImpersonate;
- this.outputLimit = outputLimit;
- }
-
-
- // VisibleForTesting
- public RemoteInterpreter(Properties property, String sessionKey, String className,
- String interpreterRunner, String interpreterPath, String localRepoPath,
- Map<String, String> env, int connectTimeout,
- RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
- super(property);
- this.className = className;
- this.sessionKey = sessionKey;
- this.interpreterRunner = interpreterRunner;
- this.interpreterPath = interpreterPath;
- this.localRepoPath = localRepoPath;
- env.putAll(getEnvFromInterpreterProperty(property));
- this.env = env;
- this.connectTimeout = connectTimeout;
- this.maxPoolSize = 10;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.applicationEventListener = appListener;
- this.userName = userName;
- this.isUserImpersonate = isUserImpersonate;
- }
-
- private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
- Map<String, String> env = new HashMap<>();
- for (Object key : property.keySet()) {
- if (isEnvString((String) key)) {
- env.put((String) key, property.getProperty((String) key));
- }
- }
- return env;
- }
-
- static boolean isEnvString(String key) {
- if (key == null || key.length() == 0) {
- return false;
- }
-
- return key.matches("^[A-Z_0-9]*");
- }
-
- @Override
- public String getClassName() {
- return className;
- }
-
- private boolean connectToExistingProcess() {
- return host != null && port > 0;
- }
-
- public RemoteInterpreterProcess getInterpreterProcess() {
- InterpreterGroup intpGroup = getInterpreterGroup();
- if (intpGroup == null) {
- return null;
- }
-
- synchronized (intpGroup) {
- if (intpGroup.getRemoteInterpreterProcess() == null) {
- RemoteInterpreterProcess remoteProcess;
- if (connectToExistingProcess()) {
- remoteProcess = new RemoteInterpreterRunningProcess(
- connectTimeout,
- remoteInterpreterProcessListener,
- applicationEventListener,
- host,
- port);
- } else {
- // create new remote process
- remoteProcess = new RemoteInterpreterManagedProcess(
- interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
- remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
- }
-
- intpGroup.setRemoteInterpreterProcess(remoteProcess);
- }
-
- return intpGroup.getRemoteInterpreterProcess();
- }
- }
-
- public synchronized void init() {
- if (initialized == true) {
- return;
- }
-
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-
- final InterpreterGroup interpreterGroup = getInterpreterGroup();
-
- interpreterProcess.setMaxPoolSize(
- Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
- String groupId = interpreterGroup.getId();
-
- synchronized (interpreterProcess) {
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- logger.info("Create remote interpreter {}", getClassName());
- if (localRepoPath != null) {
- property.put("zeppelin.interpreter.localRepo", localRepoPath);
- }
-
- property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
- client.createInterpreter(groupId, sessionKey,
- getClassName(), (Map) property, userName);
- // Push angular object loaded from JSON file to remote interpreter
- if (!interpreterGroup.isAngularRegistryPushed()) {
- pushAngularObjectRegistryToRemote(client);
- interpreterGroup.setAngularRegistryPushed(true);
- }
-
- } catch (TException e) {
- logger.error("Failed to create interpreter: {}", getClassName());
- throw new InterpreterException(e);
- } finally {
- // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
- interpreterProcess.releaseClient(client, broken);
- }
- }
- initialized = true;
- }
-
-
- @Override
- public void open() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
-
- synchronized (interpreterGroup) {
- // initialize all interpreters in this interpreter group
- List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
- // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
- // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
- // doesn't call open method, it's not open. It causes problem while running intp.close()
- // In case of Spark, this method initializes all of interpreters and init() method increases
- // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
- // other interpreters doesn't do anything because those LazyInterpreters aren't open.
- // But for now, we have to initialise all of interpreters for some reasons.
- // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- if (!initialized) {
- // reference per session
- interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
- }
- for (Interpreter intp : new ArrayList<>(interpreters)) {
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- try {
- ((RemoteInterpreter) p).init();
- } catch (InterpreterException e) {
- logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
- p.getClassName());
- interpreters.remove(p);
- }
- }
- }
- }
-
- @Override
- public void close() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
- synchronized (interpreterGroup) {
- // close all interpreters in this session
- List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
- // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
- // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
- // doesn't call open method, it's not open. It causes problem while running intp.close()
- // In case of Spark, this method initializes all of interpreters and init() method increases
- // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
- // other interpreters doesn't do anything because those LazyInterpreters aren't open.
- // But for now, we have to initialise all of interpreters for some reasons.
- // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
- if (initialized) {
- // dereference per session
- getInterpreterProcess().dereference();
- }
- for (Interpreter intp : new ArrayList<>(interpreters)) {
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- try {
- ((RemoteInterpreter) p).closeInterpreter();
- } catch (InterpreterException e) {
- logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
- p.getClassName());
- interpreters.remove(p);
- }
- }
- }
- }
-
- public void closeInterpreter() {
- if (this.initialized == false) {
- return;
- }
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- boolean broken = false;
- try {
- client = interpreterProcess.getClient();
- if (client != null) {
- client.close(sessionKey, className);
- }
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- } finally {
- if (client != null) {
- interpreterProcess.releaseClient(client, broken);
- }
- this.initialized = false;
- }
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- if (logger.isDebugEnabled()) {
- logger.debug("st:\n{}", st);
- }
-
- FormType form = getFormType();
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
- .getInterpreterContextRunnerPool();
-
- List<InterpreterContextRunner> runners = context.getRunners();
- if (runners != null && runners.size() != 0) {
- // assume all runners in this InterpreterContext have the same note id
- String noteId = runners.get(0).getNoteId();
-
- interpreterContextRunnerPool.clear(noteId);
- interpreterContextRunnerPool.addAll(noteId, runners);
- }
-
- boolean broken = false;
- try {
-
- final GUI currentGUI = context.getGui();
- RemoteInterpreterResult remoteResult = client.interpret(
- sessionKey, className, st, convert(context));
-
- Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
- remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
- }.getType());
- context.getConfig().clear();
- context.getConfig().putAll(remoteConfig);
-
- if (form == FormType.NATIVE) {
- GUI remoteGui = GUI.fromJson(remoteResult.getGui());
- currentGUI.clear();
- currentGUI.setParams(remoteGui.getParams());
- currentGUI.setForms(remoteGui.getForms());
- } else if (form == FormType.SIMPLE) {
- final Map<String, Input> currentForms = currentGUI.getForms();
- final Map<String, Object> currentParams = currentGUI.getParams();
- final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
- final Map<String, Input> remoteForms = remoteGUI.getForms();
- final Map<String, Object> remoteParams = remoteGUI.getParams();
- currentForms.putAll(remoteForms);
- currentParams.putAll(remoteParams);
- }
-
- InterpreterResult result = convert(remoteResult);
- return result;
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- client.cancel(sessionKey, className, convert(context));
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
- @Override
- public FormType getFormType() {
- open();
-
- if (formType != null) {
- return formType;
- }
-
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- formType = FormType.valueOf(client.getFormType(sessionKey, className));
- return formType;
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- if (interpreterProcess == null || !interpreterProcess.isRunning()) {
- return 0;
- }
-
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- return client.getProgress(sessionKey, className, convert(context));
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- List completion = client.completion(sessionKey, className, buf, cursor,
- convert(interpreterContext));
- return completion;
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
- @Override
- public Scheduler getScheduler() {
- int maxConcurrency = maxPoolSize;
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- if (interpreterProcess == null) {
- return null;
- } else {
- return SchedulerFactory.singleton().createOrGetRemoteScheduler(
- RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
- sessionKey, interpreterProcess, maxConcurrency);
- }
- }
-
- private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
- return interpreterGroup.getId();
- }
-
- private RemoteInterpreterContext convert(InterpreterContext ic) {
- return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
- ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
- gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
- }
-
- private InterpreterResult convert(RemoteInterpreterResult result) {
- InterpreterResult r = new InterpreterResult(
- InterpreterResult.Code.valueOf(result.getCode()));
-
- for (RemoteInterpreterResultMessage m : result.getMsg()) {
- r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
- }
-
- return r;
- }
-
- /**
- * Push local angular object registry to
- * remote interpreter. This method should be
- * call ONLY inside the init() method
- */
- void pushAngularObjectRegistryToRemote(Client client) throws TException {
- final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
- .getAngularObjectRegistry();
-
- if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
- final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
- .getRegistry();
-
- logger.info("Push local angular object registry from ZeppelinServer to" +
- " remote interpreter group {}", this.getInterpreterGroup().getId());
-
- final java.lang.reflect.Type registryType = new TypeToken<Map<String,
- Map<String, AngularObject>>>() {
- }.getType();
-
- Gson gson = new Gson();
- client.angularRegistryPush(gson.toJson(registry, registryType));
- }
- }
-
- public Map<String, String> getEnv() {
- return env;
- }
-
- public void setEnv(Map<String, String> env) {
- this.env = env;
- }
-
- public void addEnv(Map<String, String> env) {
- if (this.env == null) {
- this.env = new HashMap<>();
- }
- this.env.putAll(env);
- }
-
- //Only for test
- public String getInterpreterRunner() {
- return interpreterRunner;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
deleted file mode 100644
index 1fb9b90..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.commons.exec.*;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * This class manages start / stop of remote interpreter process
- */
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
- implements ExecuteResultHandler {
- private static final Logger logger = LoggerFactory.getLogger(
- RemoteInterpreterManagedProcess.class);
- private final String interpreterRunner;
-
- private DefaultExecutor executor;
- private ExecuteWatchdog watchdog;
- boolean running = false;
- private int port = -1;
- private final String interpreterDir;
- private final String localRepoDir;
- private final String interpreterGroupName;
-
- private Map<String, String> env;
-
- public RemoteInterpreterManagedProcess(
- String intpRunner,
- String intpDir,
- String localRepoDir,
- Map<String, String> env,
- int connectTimeout,
- RemoteInterpreterProcessListener listener,
- ApplicationEventListener appListener,
- String interpreterGroupName) {
- super(new RemoteInterpreterEventPoller(listener, appListener),
- connectTimeout);
- this.interpreterRunner = intpRunner;
- this.env = env;
- this.interpreterDir = intpDir;
- this.localRepoDir = localRepoDir;
- this.interpreterGroupName = interpreterGroupName;
- }
-
- RemoteInterpreterManagedProcess(String intpRunner,
- String intpDir,
- String localRepoDir,
- Map<String, String> env,
- RemoteInterpreterEventPoller remoteInterpreterEventPoller,
- int connectTimeout,
- String interpreterGroupName) {
- super(remoteInterpreterEventPoller,
- connectTimeout);
- this.interpreterRunner = intpRunner;
- this.env = env;
- this.interpreterDir = intpDir;
- this.localRepoDir = localRepoDir;
- this.interpreterGroupName = interpreterGroupName;
- }
-
- @Override
- public String getHost() {
- return "localhost";
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- @Override
- public void start(String userName, Boolean isUserImpersonate) {
- // start server process
- try {
- port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
- } catch (IOException e1) {
- throw new InterpreterException(e1);
- }
-
- CommandLine cmdLine = CommandLine.parse(interpreterRunner);
- cmdLine.addArgument("-d", false);
- cmdLine.addArgument(interpreterDir, false);
- cmdLine.addArgument("-p", false);
- cmdLine.addArgument(Integer.toString(port), false);
- if (isUserImpersonate && !userName.equals("anonymous")) {
- cmdLine.addArgument("-u", false);
- cmdLine.addArgument(userName, false);
- }
- cmdLine.addArgument("-l", false);
- cmdLine.addArgument(localRepoDir, false);
- cmdLine.addArgument("-g", false);
- cmdLine.addArgument(interpreterGroupName, false);
-
- executor = new DefaultExecutor();
-
- ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
- ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
- processOutput.setOutputStream(cmdOut);
-
- executor.setStreamHandler(new PumpStreamHandler(processOutput));
- watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
- executor.setWatchdog(watchdog);
-
- try {
- Map procEnv = EnvironmentUtils.getProcEnvironment();
- procEnv.putAll(env);
-
- logger.info("Run interpreter process {}", cmdLine);
- executor.execute(cmdLine, procEnv, this);
- running = true;
- } catch (IOException e) {
- running = false;
- throw new InterpreterException(e);
- }
-
-
- long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
- if (!running) {
- try {
- cmdOut.flush();
- } catch (IOException e) {
- // nothing to do
- }
- throw new InterpreterException(new String(cmdOut.toByteArray()));
- }
-
- try {
- if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
- break;
- } else {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
- "Thread.sleep", e);
- }
- }
- } catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Remote interpreter not yet accessible at localhost:" + port);
- }
- }
- }
- processOutput.setOutputStream(null);
- }
-
- public void stop() {
- if (isRunning()) {
- logger.info("kill interpreter process");
- watchdog.destroyProcess();
- }
-
- executor = null;
- watchdog = null;
- running = false;
- logger.info("Remote process terminated");
- }
-
- @Override
- public void onProcessComplete(int exitValue) {
- logger.info("Interpreter process exited {}", exitValue);
- running = false;
-
- }
-
- @Override
- public void onProcessFailed(ExecuteException e) {
- logger.info("Interpreter process failed {}", e);
- running = false;
- }
-
- public boolean isRunning() {
- return running;
- }
-
- private static class ProcessLogOutputStream extends LogOutputStream {
-
- private Logger logger;
- OutputStream out;
-
- public ProcessLogOutputStream(Logger logger) {
- this.logger = logger;
- }
-
- @Override
- protected void processLine(String s, int i) {
- this.logger.debug(s);
- }
-
- @Override
- public void write(byte [] b) throws IOException {
- super.write(b);
-
- if (out != null) {
- synchronized (this) {
- if (out != null) {
- out.write(b);
- }
- }
- }
- }
-
- @Override
- public void write(byte [] b, int offset, int len) throws IOException {
- super.write(b, offset, len);
-
- if (out != null) {
- synchronized (this) {
- if (out != null) {
- out.write(b, offset, len);
- }
- }
- }
- }
-
- public void setOutputStream(OutputStream out) {
- synchronized (this) {
- this.out = out;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
deleted file mode 100644
index bb176be..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class connects to existing process
- */
-public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
- private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
- private final String host;
- private final int port;
-
- public RemoteInterpreterRunningProcess(
- int connectTimeout,
- RemoteInterpreterProcessListener listener,
- ApplicationEventListener appListener,
- String host,
- int port
- ) {
- super(connectTimeout, listener, appListener);
- this.host = host;
- this.port = port;
- }
-
- @Override
- public String getHost() {
- return host;
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- @Override
- public void start(String userName, Boolean isUserImpersonate) {
- // assume process is externally managed. nothing to do
- }
-
- @Override
- public void stop() {
- // assume process is externally managed. nothing to do
- }
-
- @Override
- public boolean isRunning() {
- return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 8f40ec4..50881ca 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -218,7 +218,7 @@ public class RemoteInterpreterServer
private void setSystemProperty(Properties properties) {
for (Object key : properties.keySet()) {
- if (!RemoteInterpreter.isEnvString((String) key)) {
+ if (!RemoteInterpreterUtils.isEnvString((String) key)) {
String value = properties.getProperty((String) key);
if (value == null || value.isEmpty()) {
System.clearProperty((String) key);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 8308222..4ee6690 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -17,7 +17,6 @@
package org.apache.zeppelin.interpreter.remote;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,4 +72,12 @@ public class RemoteInterpreterUtils {
}
return settingId;
}
+
+ public static boolean isEnvString(String key) {
+ if (key == null || key.length() == 0) {
+ return false;
+ }
+
+ return key.matches("^[A-Z_0-9]*");
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
index 1a7f606..a55cdf9 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.resource;
import com.google.gson.Gson;
import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index a4ab00e..f9ddc4e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -20,7 +20,6 @@ package org.apache.zeppelin.scheduler;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.scheduler.Job.Status;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
new file mode 100644
index 0000000..a7a6eb9
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
@@ -0,0 +1,43 @@
+package org.apache.zeppelin.interpreter;
+
+import java.util.Properties;
+
+/**
+ *
+ */
+public class DummyInterpreter extends Interpreter {
+
+ public DummyInterpreter(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ return null;
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+
+ }
+
+ @Override
+ public FormType getFormType() {
+ return null;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
index a9ac1fc..4141e95 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.interpreter;
import java.util.Properties;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Test;
@@ -31,7 +30,7 @@ public class InterpreterTest {
public void testDefaultProperty() {
Properties p = new Properties();
p.put("p1", "v1");
- MockInterpreterA intp = new MockInterpreterA(p);
+ Interpreter intp = new DummyInterpreter(p);
assertEquals(1, intp.getProperty().size());
assertEquals("v1", intp.getProperty().get("p1"));
@@ -42,7 +41,7 @@ public class InterpreterTest {
public void testOverriddenProperty() {
Properties p = new Properties();
p.put("p1", "v1");
- MockInterpreterA intp = new MockInterpreterA(p);
+ Interpreter intp = new DummyInterpreter(p);
Properties overriddenProperty = new Properties();
overriddenProperty.put("p1", "v2");
intp.setProperty(overriddenProperty);
@@ -74,7 +73,7 @@ public class InterpreterTest {
Properties p = new Properties();
p.put("p1", "replName #{noteId}, #{paragraphTitle}, #{paragraphId}, #{paragraphText}, #{replName}, #{noteId}, #{user}," +
" #{authenticationInfo}");
- MockInterpreterA intp = new MockInterpreterA(p);
+ Interpreter intp = new DummyInterpreter(p);
intp.setUserName(user);
String actual = intp.getProperty("p1");
InterpreterContext.remove();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
deleted file mode 100644
index c8c64ea..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.atMost;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.After;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class AppendOutputRunnerTest {
-
- private static final int NUM_EVENTS = 10000;
- private static final int NUM_CLUBBED_EVENTS = 100;
- private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
- private static ScheduledFuture<?> future = null;
- /* It is being accessed by multiple threads.
- * While loop for 'loopForBufferCompletion' could
- * run for-ever.
- */
- private volatile static int numInvocations = 0;
-
- @After
- public void afterEach() {
- if (future != null) {
- future.cancel(true);
- }
- }
-
- @Test
- public void testSingleEvent() throws InterruptedException {
- RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
- String[][] buffer = {{"note", "para", "data\n"}};
-
- loopForCompletingEvents(listener, 1, buffer);
- verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
- verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n");
- }
-
- @Test
- public void testMultipleEventsOfSameParagraph() throws InterruptedException {
- RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
- String note1 = "note1";
- String para1 = "para1";
- String[][] buffer = {
- {note1, para1, "data1\n"},
- {note1, para1, "data2\n"},
- {note1, para1, "data3\n"}
- };
-
- loopForCompletingEvents(listener, 1, buffer);
- verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
- verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
- }
-
- @Test
- public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
- RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
- String note1 = "note1";
- String note2 = "note2";
- String para1 = "para1";
- String para2 = "para2";
- String[][] buffer = {
- {note1, para1, "data1\n"},
- {note1, para2, "data2\n"},
- {note2, para1, "data3\n"},
- {note2, para2, "data4\n"}
- };
- loopForCompletingEvents(listener, 4, buffer);
-
- verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
- verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n");
- verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n");
- verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n");
- verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n");
- }
-
- @Test
- public void testClubbedData() throws InterruptedException {
- RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
- AppendOutputRunner runner = new AppendOutputRunner(listener);
- future = service.scheduleWithFixedDelay(runner, 0,
- AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
- Thread thread = new Thread(new BombardEvents(runner));
- thread.start();
- thread.join();
- Thread.sleep(1000);
-
- /* NUM_CLUBBED_EVENTS is a heuristic number.
- * It has been observed that for 10,000 continuos event
- * calls, 30-40 Web-socket calls are made. Keeping
- * the unit-test to a pessimistic 100 web-socket calls.
- */
- verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
- }
-
- @Test
- public void testWarnLoggerForLargeData() throws InterruptedException {
- RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
- AppendOutputRunner runner = new AppendOutputRunner(listener);
- String data = "data\n";
- int numEvents = 100000;
-
- for (int i=0; i<numEvents; i++) {
- runner.appendBuffer("noteId", "paraId", 0, data);
- }
-
- TestAppender appender = new TestAppender();
- Logger logger = Logger.getRootLogger();
- logger.addAppender(appender);
- Logger.getLogger(RemoteInterpreterEventPoller.class);
-
- runner.run();
- List<LoggingEvent> log;
-
- int warnLogCounter;
- LoggingEvent sizeWarnLogEntry = null;
- do {
- warnLogCounter = 0;
- log = appender.getLog();
- for (LoggingEvent logEntry: log) {
- if (Level.WARN.equals(logEntry.getLevel())) {
- sizeWarnLogEntry = logEntry;
- warnLogCounter += 1;
- }
- }
- } while(warnLogCounter != 2);
-
- String loggerString = "Processing size for buffered append-output is high: " +
- (data.length() * numEvents) + " characters.";
- assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
- }
-
- private class BombardEvents implements Runnable {
-
- private final AppendOutputRunner runner;
-
- private BombardEvents(AppendOutputRunner runner) {
- this.runner = runner;
- }
-
- @Override
- public void run() {
- String noteId = "noteId";
- String paraId = "paraId";
- for (int i=0; i<NUM_EVENTS; i++) {
- runner.appendBuffer(noteId, paraId, 0, "data\n");
- }
- }
- }
-
- private class TestAppender extends AppenderSkeleton {
- private final List<LoggingEvent> log = new ArrayList<>();
-
- @Override
- public boolean requiresLayout() {
- return false;
- }
-
- @Override
- protected void append(final LoggingEvent loggingEvent) {
- log.add(loggingEvent);
- }
-
- @Override
- public void close() {
- }
-
- public List<LoggingEvent> getLog() {
- return new ArrayList<>(log);
- }
- }
-
- private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- numInvocations += 1;
- return null;
- }
- }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
- }
-
- private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
- int numTimes, String[][] buffer) {
- numInvocations = 0;
- prepareInvocationCounts(listener);
- AppendOutputRunner runner = new AppendOutputRunner(listener);
- for (String[] bufferElement: buffer) {
- runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
- }
- future = service.scheduleWithFixedDelay(runner, 0,
- AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
- long startTimeMs = System.currentTimeMillis();
- while(numInvocations != numTimes) {
- if (System.currentTimeMillis() - startTimeMs > 2000) {
- fail("Buffered events were not sent for 2 seconds");
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
deleted file mode 100644
index f7404e3..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.zeppelin.display.*;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
-
- private InterpreterGroup intpGroup;
- private HashMap<String, String> env;
- private RemoteInterpreter intp;
- private InterpreterContext context;
- private RemoteAngularObjectRegistry localRegistry;
-
- private AtomicInteger onAdd;
- private AtomicInteger onUpdate;
- private AtomicInteger onRemove;
-
- @Before
- public void setUp() throws Exception {
- onAdd = new AtomicInteger(0);
- onUpdate = new AtomicInteger(0);
- onRemove = new AtomicInteger(0);
-
- intpGroup = new InterpreterGroup("intpId");
- localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup);
- intpGroup.setAngularObjectRegistry(localRegistry);
- env = new HashMap<>();
- env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
- Properties p = new Properties();
-
- intp = new RemoteInterpreter(
- p,
- "note",
- MockInterpreterAngular.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- null,
- null,
- "anonymous",
- false
- );
-
- intpGroup.put("note", new LinkedList<Interpreter>());
- intpGroup.get("note").add(intp);
- intp.setInterpreterGroup(intpGroup);
-
- context = new InterpreterContext(
- "note",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null);
-
- intp.open();
- }
-
- @After
- public void tearDown() throws Exception {
- intp.close();
- intpGroup.close();
- }
-
- @Test
- public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
- InterpreterResult ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- String[] result = ret.message().get(0).getData().split(" ");
- assertEquals("0", result[0]); // size of registry
- assertEquals("0", result[1]); // num watcher called
-
- // create object
- ret = intp.interpret("add n1 v1", context);
- Thread.sleep(500);
- result = ret.message().get(0).getData().split(" ");
- assertEquals("1", result[0]); // size of registry
- assertEquals("0", result[1]); // num watcher called
- assertEquals("v1", localRegistry.get("n1", "note", null).get());
-
- // update object
- ret = intp.interpret("update n1 v11", context);
- result = ret.message().get(0).getData().split(" ");
- Thread.sleep(500);
- assertEquals("1", result[0]); // size of registry
- assertEquals("1", result[1]); // num watcher called
- assertEquals("v11", localRegistry.get("n1", "note", null).get());
-
- // remove object
- ret = intp.interpret("remove n1", context);
- result = ret.message().get(0).getData().split(" ");
- Thread.sleep(500);
- assertEquals("0", result[0]); // size of registry
- assertEquals("1", result[1]); // num watcher called
- assertEquals(null, localRegistry.get("n1", "note", null));
- }
-
- @Test
- public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
- // test if angularobject removal from server side propagate to interpreter process's registry.
- // will happen when notebook is removed.
-
- InterpreterResult ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- String[] result = ret.message().get(0).getData().split(" ");
- assertEquals("0", result[0]); // size of registry
-
- // create object
- ret = intp.interpret("add n1 v1", context);
- Thread.sleep(500);
- result = ret.message().get(0).getData().split(" ");
- assertEquals("1", result[0]); // size of registry
- assertEquals("v1", localRegistry.get("n1", "note", null).get());
-
- // remove object in local registry.
- localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
- ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- result = ret.message().get(0).getData().split(" ");
- assertEquals("0", result[0]); // size of registry
- }
-
- @Test
- public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
- // test if angularobject add from server side propagate to interpreter process's registry.
- // will happen when zeppelin server loads notebook and restore the object into registry
-
- InterpreterResult ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- String[] result = ret.message().get(0).getData().split(" ");
- assertEquals("0", result[0]); // size of registry
-
- // create object
- localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
-
- // get from remote registry
- ret = intp.interpret("get", context);
- Thread.sleep(500); // waitFor eventpoller pool event
- result = ret.message().get(0).getData().split(" ");
- assertEquals("1", result[0]); // size of registry
- }
-
- @Override
- public void onAdd(String interpreterGroupId, AngularObject object) {
- onAdd.incrementAndGet();
- }
-
- @Override
- public void onUpdate(String interpreterGroupId, AngularObject object) {
- onUpdate.incrementAndGet();
- }
-
- @Override
- public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
- onRemove.incrementAndGet();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
deleted file mode 100644
index 49aa7aa..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.junit.Test;
-
-import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class RemoteInterpreterEventPollerTest {
-
- @Test
- public void shouldClearUnreadEventsOnShutdown() throws Exception {
- RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
- RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
-
- eventPoller.setInterpreterProcess(interpreterProc);
- eventPoller.shutdown();
- eventPoller.start();
- eventPoller.join();
-
- assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
- }
-
- private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
- RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
- RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
- RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
- RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
-
- when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
- when(intProc.getClient()).thenReturn(client);
-
- return intProc;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
deleted file mode 100644
index 3f865cb..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-
-/**
- * Test for remote interpreter output stream
- */
-public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
- private InterpreterGroup intpGroup;
- private HashMap<String, String> env;
-
- @Before
- public void setUp() throws Exception {
- intpGroup = new InterpreterGroup();
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- env = new HashMap<>();
- env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
- }
-
- @After
- public void tearDown() throws Exception {
- intpGroup.close();
- }
-
- private RemoteInterpreter createMockInterpreter() {
- RemoteInterpreter intp = new RemoteInterpreter(
- new Properties(),
- "note",
- MockInterpreterOutputStream.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- this,
- null,
- "anonymous",
- false);
-
- intpGroup.get("note").add(intp);
- intp.setInterpreterGroup(intpGroup);
- return intp;
- }
-
- private InterpreterContext createInterpreterContext() {
- return new InterpreterContext(
- "noteId",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- null,
- new LinkedList<InterpreterContextRunner>(), null);
- }
-
- @Test
- public void testInterpreterResultOnly() {
- RemoteInterpreter intp = createMockInterpreter();
- InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals("staticresult", ret.message().get(0).getData());
-
- ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals("staticresult2", ret.message().get(0).getData());
-
- ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
- assertEquals(InterpreterResult.Code.ERROR, ret.code());
- assertEquals("staticresult3", ret.message().get(0).getData());
- }
-
- @Test
- public void testInterpreterOutputStreamOnly() {
- RemoteInterpreter intp = createMockInterpreter();
- InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals("streamresult", ret.message().get(0).getData());
-
- ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
- assertEquals(InterpreterResult.Code.ERROR, ret.code());
- assertEquals("streamresult2", ret.message().get(0).getData());
- }
-
- @Test
- public void testInterpreterResultOutputStreamMixed() {
- RemoteInterpreter intp = createMockInterpreter();
- InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals("stream", ret.message().get(0).getData());
- assertEquals("static", ret.message().get(1).getData());
- }
-
- @Test
- public void testOutputType() {
- RemoteInterpreter intp = createMockInterpreter();
-
- InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
- assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
- assertEquals("hello", ret.message().get(0).getData());
-
- ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
- assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
- assertEquals("hello", ret.message().get(0).getData());
-
- ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
- assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
- assertEquals("hello", ret.message().get(0).getData());
- assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType());
- assertEquals("world", ret.message().get(1).getData());
- }
-
- @Override
- public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
-
- }
-
- @Override
- public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
-
- }
-
- @Override
- public void onOutputClear(String noteId, String paragraphId) {
-
- }
-
- @Override
- public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
-
- }
-
- @Override
- public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
- if (callback != null) {
- callback.onFinished(new LinkedList<>());
- }
- }
-
- @Override
- public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
-
- }
-
- @Override
- public void onParaInfosReceived(String noteId, String paragraphId,
- String interpreterSettingId, Map<String, String> metaInfos) {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
deleted file mode 100644
index b85d7ef..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.*;
-
-import java.util.HashMap;
-import java.util.Properties;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.interpreter.Constants;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.junit.Test;
-
-public class RemoteInterpreterProcessTest {
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
- private static final int DUMMY_PORT=3678;
-
- @Test
- public void testStartStop() {
- InterpreterGroup intpGroup = new InterpreterGroup();
- RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
- INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
- 10 * 1000, null, null,"fakeName");
- assertFalse(rip.isRunning());
- assertEquals(0, rip.referenceCount());
- assertEquals(1, rip.reference(intpGroup, "anonymous", false));
- assertEquals(2, rip.reference(intpGroup, "anonymous", false));
- assertEquals(true, rip.isRunning());
- assertEquals(1, rip.dereference());
- assertEquals(true, rip.isRunning());
- assertEquals(0, rip.dereference());
- assertEquals(false, rip.isRunning());
- }
-
- @Test
- public void testClientFactory() throws Exception {
- InterpreterGroup intpGroup = new InterpreterGroup();
- RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
- INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
- mock(RemoteInterpreterEventPoller.class), 10 * 1000, "fakeName");
- rip.reference(intpGroup, "anonymous", false);
- assertEquals(0, rip.getNumActiveClient());
- assertEquals(0, rip.getNumIdleClient());
-
- Client client = rip.getClient();
- assertEquals(1, rip.getNumActiveClient());
- assertEquals(0, rip.getNumIdleClient());
-
- rip.releaseClient(client);
- assertEquals(0, rip.getNumActiveClient());
- assertEquals(1, rip.getNumIdleClient());
-
- rip.dereference();
- }
-
- @Test
- public void testStartStopRemoteInterpreter() throws TException, InterruptedException {
- RemoteInterpreterServer server = new RemoteInterpreterServer(3678);
- server.start();
- boolean running = false;
- long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime < 10 * 1000) {
- if (server.isRunning()) {
- running = true;
- break;
- } else {
- Thread.sleep(200);
- }
- }
- Properties properties = new Properties();
- properties.setProperty(Constants.ZEPPELIN_INTERPRETER_PORT, "3678");
- properties.setProperty(Constants.ZEPPELIN_INTERPRETER_HOST, "localhost");
- InterpreterGroup intpGroup = mock(InterpreterGroup.class);
- when(intpGroup.getProperty()).thenReturn(properties);
- when(intpGroup.containsKey(Constants.EXISTING_PROCESS)).thenReturn(true);
-
- RemoteInterpreterProcess rip = new RemoteInterpreterManagedProcess(
- INTERPRETER_SCRIPT,
- "nonexists",
- "fakeRepo",
- new HashMap<String, String>(),
- mock(RemoteInterpreterEventPoller.class)
- , 10 * 1000,
- "fakeName");
- assertFalse(rip.isRunning());
- assertEquals(0, rip.referenceCount());
- assertEquals(1, rip.reference(intpGroup, "anonymous", false));
- assertEquals(true, rip.isRunning());
- }
-
-
- @Test
- public void testPropagateError() throws TException, InterruptedException {
- InterpreterGroup intpGroup = new InterpreterGroup();
- RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
- "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
- 10 * 1000, null, null, "fakeName");
- assertFalse(rip.isRunning());
- assertEquals(0, rip.referenceCount());
- try {
- assertEquals(1, rip.reference(intpGroup, "anonymous", false));
- } catch (InterpreterException e) {
- e.getMessage().contains("hello_world");
- }
- assertEquals(0, rip.referenceCount());
- }
-}
[3/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine
Posted by jo...@apache.org.
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
deleted file mode 100644
index ffcb8d5..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ /dev/null
@@ -1,914 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-public class RemoteInterpreterTest {
-
-
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
-
- private InterpreterGroup intpGroup;
- private HashMap<String, String> env;
-
- @Before
- public void setUp() throws Exception {
- intpGroup = new InterpreterGroup();
- env = new HashMap<>();
- env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
- }
-
- @After
- public void tearDown() throws Exception {
- intpGroup.close();
- }
-
- private RemoteInterpreter createMockInterpreterA(Properties p) {
- return createMockInterpreterA(p, "note");
- }
-
- private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) {
- return new RemoteInterpreter(
- p,
- noteId,
- MockInterpreterA.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- null,
- null,
- "anonymous",
- false);
- }
-
- private RemoteInterpreter createMockInterpreterB(Properties p) {
- return createMockInterpreterB(p, "note");
- }
-
- private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) {
- return new RemoteInterpreter(
- p,
- noteId,
- MockInterpreterB.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- null,
- null,
- "anonymous",
- false);
- }
-
- @Test
- public void testRemoteInterperterCall() throws TTransportException, IOException {
- Properties p = new Properties();
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpGroup.get("note").add(intpA);
-
- intpA.setInterpreterGroup(intpGroup);
-
- RemoteInterpreter intpB = createMockInterpreterB(p);
-
- intpGroup.get("note").add(intpB);
- intpB.setInterpreterGroup(intpGroup);
-
-
- RemoteInterpreterProcess process = intpA.getInterpreterProcess();
- process.equals(intpB.getInterpreterProcess());
-
- assertFalse(process.isRunning());
- assertEquals(0, process.getNumIdleClient());
- assertEquals(0, process.referenceCount());
-
- intpA.open(); // initializa all interpreters in the same group
- assertTrue(process.isRunning());
- assertEquals(1, process.getNumIdleClient());
- assertEquals(1, process.referenceCount());
-
- intpA.interpret("1",
- new InterpreterContext(
- "note",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
-
- intpB.open();
- assertEquals(1, process.referenceCount());
-
- intpA.close();
- assertEquals(0, process.referenceCount());
- intpB.close();
- assertEquals(0, process.referenceCount());
-
- assertFalse(process.isRunning());
-
- }
-
- @Test
- public void testExecuteIncorrectPrecode() throws TTransportException, IOException {
- Properties p = new Properties();
- p.put("zeppelin.MockInterpreterA.precode", "fail test");
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpGroup.get("note").add(intpA);
-
- intpA.setInterpreterGroup(intpGroup);
-
- RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-
- intpA.open();
-
- InterpreterResult result = intpA.interpret("1",
- new InterpreterContext(
- "note",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
-
-
-
- intpA.close();
- assertEquals(Code.ERROR, result.code());
- }
-
- @Test
- public void testExecuteCorrectPrecode() throws TTransportException, IOException {
- Properties p = new Properties();
- p.put("zeppelin.MockInterpreterA.precode", "2");
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpGroup.get("note").add(intpA);
-
- intpA.setInterpreterGroup(intpGroup);
-
- RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-
- intpA.open();
-
- InterpreterResult result = intpA.interpret("1",
- new InterpreterContext(
- "note",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
-
-
-
- intpA.close();
- assertEquals(Code.SUCCESS, result.code());
- assertEquals("1", result.message().get(0).getData());
- }
-
- @Test
- public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
- Properties p = new Properties();
-
- RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpGroup.put("note", new LinkedList<Interpreter>());
- intpGroup.get("note").add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- intpA.open();
- InterpreterResult ret = intpA.interpret("non numeric value",
- new InterpreterContext(
- "noteId",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
-
- assertEquals(Code.ERROR, ret.code());
- }
-
- @Test
- public void testRemoteSchedulerSharing() throws TTransportException, IOException {
- Properties p = new Properties();
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- "note",
- MockInterpreterA.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- null,
- null,
- "anonymous",
- false);
-
- intpGroup.get("note").add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- RemoteInterpreter intpB = new RemoteInterpreter(
- p,
- "note",
- MockInterpreterB.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- null,
- null,
- "anonymous",
- false);
-
- intpGroup.get("note").add(intpB);
- intpB.setInterpreterGroup(intpGroup);
-
- intpA.open();
- intpB.open();
-
- long start = System.currentTimeMillis();
- InterpreterResult ret = intpA.interpret("500",
- new InterpreterContext(
- "note",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
- assertEquals("500", ret.message().get(0).getData());
-
- ret = intpB.interpret("500",
- new InterpreterContext(
- "note",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
- assertEquals("1000", ret.message().get(0).getData());
- long end = System.currentTimeMillis();
- assertTrue(end - start >= 1000);
-
-
- intpA.close();
- intpB.close();
- }
-
- @Test
- public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
- Properties p = new Properties();
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- final RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpGroup.get("note").add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- final RemoteInterpreter intpB = createMockInterpreterB(p);
-
- intpGroup.get("note").add(intpB);
- intpB.setInterpreterGroup(intpGroup);
-
- intpA.open();
- intpB.open();
-
- long start = System.currentTimeMillis();
- Job jobA = new Job("jobA", null) {
- private Object r;
-
- @Override
- public Object getReturn() {
- return r;
- }
-
- @Override
- public void setResult(Object results) {
- this.r = results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- return intpA.interpret("500",
- new InterpreterContext(
- "note",
- "jobA",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- };
- intpA.getScheduler().submit(jobA);
-
- Job jobB = new Job("jobB", null) {
-
- private Object r;
-
- @Override
- public Object getReturn() {
- return r;
- }
-
- @Override
- public void setResult(Object results) {
- this.r = results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- return intpB.interpret("500",
- new InterpreterContext(
- "note",
- "jobB",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- };
- intpB.getScheduler().submit(jobB);
- // wait until both job finished
- while (jobA.getStatus() != Status.FINISHED ||
- jobB.getStatus() != Status.FINISHED) {
- Thread.sleep(100);
- }
- long end = System.currentTimeMillis();
- assertTrue(end - start >= 1000);
-
- assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message().get(0).getData());
-
- intpA.close();
- intpB.close();
- }
-
- @Test
- public void testRunOrderPreserved() throws InterruptedException {
- Properties p = new Properties();
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- final RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpGroup.get("note").add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- intpA.open();
-
- int concurrency = 3;
- final List<InterpreterResultMessage> results = new LinkedList<>();
-
- Scheduler scheduler = intpA.getScheduler();
- for (int i = 0; i < concurrency; i++) {
- final String jobId = Integer.toString(i);
- scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
- private Object r;
-
- @Override
- public Object getReturn() {
- return r;
- }
-
- @Override
- public void setResult(Object results) {
- this.r = results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext(
- "note",
- jobId,
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
-
- synchronized (results) {
- results.addAll(ret.message());
- results.notify();
- }
- return null;
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- });
- }
-
- // wait for job finished
- synchronized (results) {
- while (results.size() != concurrency) {
- results.wait(300);
- }
- }
-
- int i = 0;
- for (InterpreterResultMessage result : results) {
- assertEquals(Integer.toString(i++), result.getData());
- }
- assertEquals(concurrency, i);
-
- intpA.close();
- }
-
-
- @Test
- public void testRunParallel() throws InterruptedException {
- Properties p = new Properties();
- p.put("parallel", "true");
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- final RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpGroup.get("note").add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- intpA.open();
-
- int concurrency = 4;
- final int timeToSleep = 1000;
- final List<InterpreterResultMessage> results = new LinkedList<>();
- long start = System.currentTimeMillis();
-
- Scheduler scheduler = intpA.getScheduler();
- for (int i = 0; i < concurrency; i++) {
- final String jobId = Integer.toString(i);
- scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
- private Object r;
-
- @Override
- public Object getReturn() {
- return r;
- }
-
- @Override
- public void setResult(Object results) {
- this.r = results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- String stmt = Integer.toString(timeToSleep);
- InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
- "note",
- jobId,
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
-
- synchronized (results) {
- results.addAll(ret.message());
- results.notify();
- }
- return stmt;
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- });
- }
-
- // wait for job finished
- synchronized (results) {
- while (results.size() != concurrency) {
- results.wait(300);
- }
- }
-
- long end = System.currentTimeMillis();
-
- assertTrue(end - start < timeToSleep * concurrency);
-
- intpA.close();
- }
-
- @Test
- public void testInterpreterGroupResetBeforeProcessStarts() {
- Properties p = new Properties();
-
- RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpA.setInterpreterGroup(intpGroup);
- RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
-
- intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
- RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
-
- assertNotSame(processA.hashCode(), processB.hashCode());
- }
-
- @Test
- public void testInterpreterGroupResetAfterProcessFinished() {
- Properties p = new Properties();
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpA.setInterpreterGroup(intpGroup);
- RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
- intpA.open();
-
- processA.dereference(); // intpA.close();
-
- intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
- RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
-
- assertNotSame(processA.hashCode(), processB.hashCode());
- }
-
- @Test
- public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException {
- Properties p = new Properties();
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- final RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpGroup.get("note").add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- intpA.open();
-
- Job jobA = new Job("jobA", null) {
- private Object r;
-
- @Override
- public Object getReturn() {
- return r;
- }
-
- @Override
- public void setResult(Object results) {
- this.r = results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- return intpA.interpret("2000",
- new InterpreterContext(
- "note",
- "jobA",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- };
- intpA.getScheduler().submit(jobA);
-
- // wait for job started
- while (intpA.getScheduler().getJobsRunning().size() == 0) {
- Thread.sleep(100);
- }
-
- // restart interpreter
- RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
- intpA.close();
-
- InterpreterGroup newInterpreterGroup =
- new InterpreterGroup(intpA.getInterpreterGroup().getId());
- newInterpreterGroup.put("note", new LinkedList<Interpreter>());
-
- intpA.setInterpreterGroup(newInterpreterGroup);
- intpA.open();
- RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
-
- assertNotSame(processA.hashCode(), processB.hashCode());
-
- }
-
- @Test
- public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
- Properties p = new Properties();
- intpGroup.put("note", new LinkedList<Interpreter>());
-
- RemoteInterpreter intpA = createMockInterpreterA(p);
-
- intpGroup.get("note").add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- RemoteInterpreter intpB = createMockInterpreterB(p);
-
- intpGroup.get("note").add(intpB);
- intpB.setInterpreterGroup(intpGroup);
-
- intpA.open();
- intpB.open();
-
- assertEquals(intpA.getScheduler(), intpB.getScheduler());
- }
-
- @Test
- public void testMultiInterpreterSession() {
- Properties p = new Properties();
- intpGroup.put("sessionA", new LinkedList<Interpreter>());
- intpGroup.put("sessionB", new LinkedList<Interpreter>());
-
- RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
- intpGroup.get("sessionA").add(intpAsessionA);
- intpAsessionA.setInterpreterGroup(intpGroup);
-
- RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
- intpGroup.get("sessionA").add(intpBsessionA);
- intpBsessionA.setInterpreterGroup(intpGroup);
-
- intpAsessionA.open();
- intpBsessionA.open();
-
- assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
-
- RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
- intpGroup.get("sessionB").add(intpAsessionB);
- intpAsessionB.setInterpreterGroup(intpGroup);
-
- RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
- intpGroup.get("sessionB").add(intpBsessionB);
- intpBsessionB.setInterpreterGroup(intpGroup);
-
- intpAsessionB.open();
- intpBsessionB.open();
-
- assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
- assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
- }
-
- @Test
- public void should_push_local_angular_repo_to_remote() throws Exception {
- //Given
- final Client client = Mockito.mock(Client.class);
- final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
- MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null,
- null, "anonymous", false);
- final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
- registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
- final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
- interpreterGroup.setAngularObjectRegistry(registry);
- intr.setInterpreterGroup(interpreterGroup);
-
- final java.lang.reflect.Type registryType = new TypeToken<Map<String,
- Map<String, AngularObject>>>() {}.getType();
- final Gson gson = new Gson();
- final String expected = gson.toJson(registry.getRegistry(), registryType);
-
- //When
- intr.pushAngularObjectRegistryToRemote(client);
-
- //Then
- Mockito.verify(client).angularRegistryPush(expected);
- }
-
- @Test
- public void testEnvStringPattern() {
- assertFalse(RemoteInterpreter.isEnvString(null));
- assertFalse(RemoteInterpreter.isEnvString(""));
- assertFalse(RemoteInterpreter.isEnvString("abcDEF"));
- assertFalse(RemoteInterpreter.isEnvString("ABC-DEF"));
- assertTrue(RemoteInterpreter.isEnvString("ABCDEF"));
- assertTrue(RemoteInterpreter.isEnvString("ABC_DEF"));
- assertTrue(RemoteInterpreter.isEnvString("ABC_DEF123"));
- }
-
- @Test
- public void testEnvronmentAndPropertySet() {
- Properties p = new Properties();
- p.setProperty("MY_ENV1", "env value 1");
- p.setProperty("my.property.1", "property value 1");
-
- RemoteInterpreter intp = new RemoteInterpreter(
- p,
- "note",
- MockInterpreterEnv.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- null,
- null,
- "anonymous",
- false);
-
- intpGroup.put("note", new LinkedList<Interpreter>());
- intpGroup.get("note").add(intp);
- intp.setInterpreterGroup(intpGroup);
-
- intp.open();
-
- InterpreterContext context = new InterpreterContext(
- "noteId",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null);
-
-
- assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData());
- assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code());
- assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code());
- assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData());
-
- intp.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
deleted file mode 100644
index 975d6ea..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.junit.Test;
-
-public class RemoteInterpreterUtilsTest {
-
- @Test
- public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
- assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
deleted file mode 100644
index 81a9164..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-public class MockInterpreterA extends Interpreter {
-
- private String lastSt;
-
- public MockInterpreterA(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- //new RuntimeException().printStackTrace();
- }
-
- @Override
- public void close() {
- }
-
- public String getLastStatement() {
- return lastSt;
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- try {
- Thread.sleep(Long.parseLong(st));
- this.lastSt = st;
- } catch (NumberFormatException | InterruptedException e) {
- throw new InterpreterException(e);
- }
- return new InterpreterResult(Code.SUCCESS, st);
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- if (getProperty("parallel") != null && getProperty("parallel").equals("true")) {
- return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10);
- } else {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
deleted file mode 100644
index d4b26ad..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-
-public class MockInterpreterAngular extends Interpreter {
-
- AtomicInteger numWatch = new AtomicInteger(0);
-
- public MockInterpreterAngular(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- String[] stmt = st.split(" ");
- String cmd = stmt[0];
- String name = null;
- if (stmt.length >= 2) {
- name = stmt[1];
- }
- String value = null;
- if (stmt.length == 3) {
- value = stmt[2];
- }
-
- AngularObjectRegistry registry = context.getAngularObjectRegistry();
-
- if (cmd.equals("add")) {
- registry.add(name, value, context.getNoteId(), null);
- registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher
- (null) {
-
- @Override
- public void watch(Object oldObject, Object newObject,
- InterpreterContext context) {
- numWatch.incrementAndGet();
- }
-
- });
- } else if (cmd.equalsIgnoreCase("update")) {
- registry.get(name, context.getNoteId(), null).set(value);
- } else if (cmd.equals("remove")) {
- registry.remove(name, context.getNoteId(), null);
- }
-
- try {
- Thread.sleep(500); // wait for watcher executed
- } catch (InterruptedException e) {
- logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e);
- }
-
- String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch
- .get());
- return new InterpreterResult(Code.SUCCESS, msg);
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
deleted file mode 100644
index 7103335..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-
-public class MockInterpreterB extends Interpreter {
-
- public MockInterpreterB(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- //new RuntimeException().printStackTrace();
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- MockInterpreterA intpA = getInterpreterA();
- String intpASt = intpA.getLastStatement();
- long timeToSleep = Long.parseLong(st);
- if (intpASt != null) {
- timeToSleep += Long.parseLong(intpASt);
- }
- try {
- Thread.sleep(timeToSleep);
- } catch (NumberFormatException | InterruptedException e) {
- throw new InterpreterException(e);
- }
- return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-
- public MockInterpreterA getInterpreterA() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
- synchronized (interpreterGroup) {
- for (List<Interpreter> interpreters : interpreterGroup.values()) {
- boolean belongsToSameNoteGroup = false;
- MockInterpreterA a = null;
- for (Interpreter intp : interpreters) {
- if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- a = (MockInterpreterA) p;
- }
-
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- if (this == p) {
- belongsToSameNoteGroup = true;
- }
- }
- if (belongsToSameNoteGroup) {
- return a;
- }
- }
- }
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- MockInterpreterA intpA = getInterpreterA();
- if (intpA != null) {
- return intpA.getScheduler();
- }
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
deleted file mode 100644
index 12e11f7..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-
-public class MockInterpreterEnv extends Interpreter {
-
- public MockInterpreterEnv(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- String[] cmd = st.split(" ");
- if (cmd[0].equals("getEnv")) {
- return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1]));
- } else if (cmd[0].equals("getProperty")){
- return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1]));
- } else {
- return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]);
- }
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
deleted file mode 100644
index 349315c..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * MockInterpreter to test outputstream
- */
-public class MockInterpreterOutputStream extends Interpreter {
- private String lastSt;
-
- public MockInterpreterOutputStream(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- //new RuntimeException().printStackTrace();
- }
-
- @Override
- public void close() {
- }
-
- public String getLastStatement() {
- return lastSt;
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- String[] ret = st.split(":");
- try {
- if (ret[1] != null) {
- context.out.write(ret[1]);
- }
- } catch (IOException e) {
- throw new InterpreterException(e);
- }
- return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ?
- ret[2] : "");
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
deleted file mode 100644
index c4ff6ab..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.gson.Gson;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourcePool;
-
-public class MockInterpreterResourcePool extends Interpreter {
-
- AtomicInteger numWatch = new AtomicInteger(0);
-
- public MockInterpreterResourcePool(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- String[] stmt = st.split(" ");
- String cmd = stmt[0];
- String noteId = null;
- String paragraphId = null;
- String name = null;
- if (stmt.length >= 2) {
- String[] npn = stmt[1].split(":");
- if (npn.length >= 3) {
- noteId = npn[0];
- paragraphId = npn[1];
- name = npn[2];
- } else {
- name = stmt[1];
- }
- }
- String value = null;
- if (stmt.length >= 3) {
- value = stmt[2];
- }
-
- ResourcePool resourcePool = context.getResourcePool();
- Object ret = null;
- if (cmd.equals("put")) {
- resourcePool.put(noteId, paragraphId, name, value);
- } else if (cmd.equalsIgnoreCase("get")) {
- Resource resource = resourcePool.get(noteId, paragraphId, name);
- if (resource != null) {
- ret = resourcePool.get(noteId, paragraphId, name).get();
- } else {
- ret = "";
- }
- } else if (cmd.equals("remove")) {
- ret = resourcePool.remove(noteId, paragraphId, name);
- } else if (cmd.equals("getAll")) {
- ret = resourcePool.getAll();
- } else if (cmd.equals("invoke")) {
- Resource resource = resourcePool.get(noteId, paragraphId, name);
- if (stmt.length >=4) {
- Resource res = resource.invokeMethod(value, null, null, stmt[3]);
- ret = res.get();
- } else {
- ret = resource.invokeMethod(value, null, null);
- }
- }
-
- try {
- Thread.sleep(500); // wait for watcher executed
- } catch (InterruptedException e) {
- }
-
- Gson gson = new Gson();
- return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
deleted file mode 100644
index 363ccf6..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.resource;
-
-import com.google.gson.Gson;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Unittest for DistributedResourcePool
- */
-public class DistributedResourcePoolTest {
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
- private InterpreterGroup intpGroup1;
- private InterpreterGroup intpGroup2;
- private HashMap<String, String> env;
- private RemoteInterpreter intp1;
- private RemoteInterpreter intp2;
- private InterpreterContext context;
- private RemoteInterpreterEventPoller eventPoller1;
- private RemoteInterpreterEventPoller eventPoller2;
-
-
- @Before
- public void setUp() throws Exception {
- env = new HashMap<>();
- env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
- Properties p = new Properties();
-
- intp1 = new RemoteInterpreter(
- p,
- "note",
- MockInterpreterResourcePool.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- null,
- null,
- "anonymous",
- false
- );
-
- intpGroup1 = new InterpreterGroup("intpGroup1");
- intpGroup1.put("note", new LinkedList<Interpreter>());
- intpGroup1.get("note").add(intp1);
- intp1.setInterpreterGroup(intpGroup1);
-
- intp2 = new RemoteInterpreter(
- p,
- "note",
- MockInterpreterResourcePool.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- null,
- null,
- "anonymous",
- false
- );
-
- intpGroup2 = new InterpreterGroup("intpGroup2");
- intpGroup2.put("note", new LinkedList<Interpreter>());
- intpGroup2.get("note").add(intp2);
- intp2.setInterpreterGroup(intpGroup2);
-
- context = new InterpreterContext(
- "note",
- "id",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- null,
- null,
- new LinkedList<InterpreterContextRunner>(),
- null);
-
- intp1.open();
- intp2.open();
-
- eventPoller1 = new RemoteInterpreterEventPoller(null, null);
- eventPoller1.setInterpreterGroup(intpGroup1);
- eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
-
- eventPoller2 = new RemoteInterpreterEventPoller(null, null);
- eventPoller2.setInterpreterGroup(intpGroup2);
- eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
-
- eventPoller1.start();
- eventPoller2.start();
- }
-
- @After
- public void tearDown() throws Exception {
- eventPoller1.shutdown();
- intp1.close();
- intpGroup1.close();
- eventPoller2.shutdown();
- intp2.close();
- intpGroup2.close();
- }
-
- @Test
- public void testRemoteDistributedResourcePool() {
- Gson gson = new Gson();
- InterpreterResult ret;
- intp1.interpret("put key1 value1", context);
- intp2.interpret("put key2 value2", context);
-
- ret = intp1.interpret("getAll", context);
- assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
- ret = intp2.interpret("getAll", context);
- assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
- ret = intp1.interpret("get key1", context);
- assertEquals("value1", gson.fromJson(ret.message().get(0).getData(), String.class));
-
- ret = intp1.interpret("get key2", context);
- assertEquals("value2", gson.fromJson(ret.message().get(0).getData(), String.class));
- }
-
- @Test
- public void testDistributedResourcePool() {
- final LocalResourcePool pool2 = new LocalResourcePool("pool2");
- final LocalResourcePool pool3 = new LocalResourcePool("pool3");
-
- DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new ResourcePoolConnector() {
- @Override
- public ResourceSet getAllResources() {
- ResourceSet set = pool2.getAll();
- set.addAll(pool3.getAll());
-
- ResourceSet remoteSet = new ResourceSet();
- Gson gson = new Gson();
- for (Resource s : set) {
- RemoteResource remoteResource = gson.fromJson(gson.toJson(s), RemoteResource.class);
- remoteResource.setResourcePoolConnector(this);
- remoteSet.add(remoteResource);
- }
- return remoteSet;
- }
-
- @Override
- public Object readResource(ResourceId id) {
- if (id.getResourcePoolId().equals(pool2.id())) {
- return pool2.get(id.getName()).get();
- }
- if (id.getResourcePoolId().equals(pool3.id())) {
- return pool3.get(id.getName()).get();
- }
- return null;
- }
-
- @Override
- public Object invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] params) {
- return null;
- }
-
- @Override
- public Resource invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[]
- params, String returnResourceName) {
- return null;
- }
- });
-
- assertEquals(0, pool1.getAll().size());
-
-
- // test get() can get from pool
- pool2.put("object1", "value2");
- assertEquals(1, pool1.getAll().size());
- assertTrue(pool1.get("object1").isRemote());
- assertEquals("value2", pool1.get("object1").get());
-
- // test get() is locality aware
- pool1.put("object1", "value1");
- assertEquals(1, pool2.getAll().size());
- assertEquals("value1", pool1.get("object1").get());
-
- // test getAll() is locality aware
- assertEquals("value1", pool1.getAll().get(0).get());
- assertEquals("value2", pool1.getAll().get(1).get());
- }
-
- @Test
- public void testResourcePoolUtils() {
- Gson gson = new Gson();
- InterpreterResult ret;
-
- // when create some resources
- intp1.interpret("put note1:paragraph1:key1 value1", context);
- intp1.interpret("put note1:paragraph2:key1 value2", context);
- intp2.interpret("put note2:paragraph1:key1 value1", context);
- intp2.interpret("put note2:paragraph2:key2 value2", context);
-
-
- // then get all resources.
- assertEquals(4, ResourcePoolUtils.getAllResources().size());
-
- // when remove all resources from note1
- ResourcePoolUtils.removeResourcesBelongsToNote("note1");
-
- // then resources should be removed.
- assertEquals(2, ResourcePoolUtils.getAllResources().size());
- assertEquals("", gson.fromJson(
- intp1.interpret("get note1:paragraph1:key1", context).message().get(0).getData(),
- String.class));
- assertEquals("", gson.fromJson(
- intp1.interpret("get note1:paragraph2:key1", context).message().get(0).getData(),
- String.class));
-
-
- // when remove all resources from note2:paragraph1
- ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
-
- // then 1
- assertEquals(1, ResourcePoolUtils.getAllResources().size());
- assertEquals("value2", gson.fromJson(
- intp1.interpret("get note2:paragraph2:key2", context).message().get(0).getData(),
- String.class));
-
- }
-
- @Test
- public void testResourceInvokeMethod() {
- Gson gson = new Gson();
- InterpreterResult ret;
- intp1.interpret("put key1 hey", context);
- intp2.interpret("put key2 world", context);
-
- // invoke method in local resource pool
- ret = intp1.interpret("invoke key1 length", context);
- assertEquals("3", ret.message().get(0).getData());
-
- // invoke method in remote resource pool
- ret = intp1.interpret("invoke key2 length", context);
- assertEquals("5", ret.message().get(0).getData());
-
- // make sure no resources are automatically created
- ret = intp1.interpret("getAll", context);
- assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
- // invoke method in local resource pool and save result
- ret = intp1.interpret("invoke key1 length ret1", context);
- assertEquals("3", ret.message().get(0).getData());
-
- ret = intp1.interpret("getAll", context);
- assertEquals(3, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
- ret = intp1.interpret("get ret1", context);
- assertEquals("3", gson.fromJson(ret.message().get(0).getData(), String.class));
-
- // invoke method in remote resource pool and save result
- ret = intp1.interpret("invoke key2 length ret2", context);
- assertEquals("5", ret.message().get(0).getData());
-
- ret = intp1.interpret("getAll", context);
- assertEquals(4, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
- ret = intp1.interpret("get ret2", context);
- assertEquals("5", gson.fromJson(ret.message().get(0).getData(), String.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
deleted file mode 100644
index ebb5100..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.scheduler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
-
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
- private SchedulerFactory schedulerSvc;
- private static final int TICK_WAIT = 100;
- private static final int MAX_WAIT_CYCLES = 100;
-
- @Before
- public void setUp() throws Exception{
- schedulerSvc = new SchedulerFactory();
- }
-
- @After
- public void tearDown(){
-
- }
-
- @Test
- public void test() throws Exception {
- Properties p = new Properties();
- final InterpreterGroup intpGroup = new InterpreterGroup();
- Map<String, String> env = new HashMap<>();
- env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- "note",
- MockInterpreterA.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- this,
- null,
- "anonymous",
- false);
-
- intpGroup.put("note", new LinkedList<Interpreter>());
- intpGroup.get("note").add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- intpA.open();
-
- Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
- intpA.getInterpreterProcess(),
- 10);
-
- Job job = new Job("jobId", "jobName", null, 200) {
- Object results;
- @Override
- public Object getReturn() {
- return results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- intpA.interpret("1000", new InterpreterContext(
- "note",
- "jobId",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null));
- return "1000";
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- @Override
- public void setResult(Object results) {
- this.results = results;
- }
- };
- scheduler.submit(job);
-
- int cycles = 0;
- while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) {
- Thread.sleep(TICK_WAIT);
- cycles++;
- }
- assertTrue(job.isRunning());
-
- Thread.sleep(5*TICK_WAIT);
- assertEquals(0, scheduler.getJobsWaiting().size());
- assertEquals(1, scheduler.getJobsRunning().size());
-
- cycles = 0;
- while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) {
- Thread.sleep(TICK_WAIT);
- cycles++;
- }
-
- assertTrue(job.isTerminated());
- assertEquals(0, scheduler.getJobsWaiting().size());
- assertEquals(0, scheduler.getJobsRunning().size());
-
- intpA.close();
- schedulerSvc.removeScheduler("test");
- }
-
- @Test
- public void testAbortOnPending() throws Exception {
- Properties p = new Properties();
- final InterpreterGroup intpGroup = new InterpreterGroup();
- Map<String, String> env = new HashMap<>();
- env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- "note",
- MockInterpreterA.class.getName(),
- new File(INTERPRETER_SCRIPT).getAbsolutePath(),
- "fake",
- "fakeRepo",
- env,
- 10 * 1000,
- this,
- null,
- "anonymous",
- false);
-
- intpGroup.put("note", new LinkedList<Interpreter>());
- intpGroup.get("note").add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- intpA.open();
-
- Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
- intpA.getInterpreterProcess(),
- 10);
-
- Job job1 = new Job("jobId1", "jobName1", null, 200) {
- Object results;
- InterpreterContext context = new InterpreterContext(
- "note",
- "jobId1",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null);
-
- @Override
- public Object getReturn() {
- return results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- intpA.interpret("1000", context);
- return "1000";
- }
-
- @Override
- protected boolean jobAbort() {
- if (isRunning()) {
- intpA.cancel(context);
- }
- return true;
- }
-
- @Override
- public void setResult(Object results) {
- this.results = results;
- }
- };
-
- Job job2 = new Job("jobId2", "jobName2", null, 200) {
- public Object results;
- InterpreterContext context = new InterpreterContext(
- "note",
- "jobId2",
- null,
- "title",
- "text",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LocalResourcePool("pool1"),
- new LinkedList<InterpreterContextRunner>(), null);
-
- @Override
- public Object getReturn() {
- return results;
- }
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- intpA.interpret("1000", context);
- return "1000";
- }
-
- @Override
- protected boolean jobAbort() {
- if (isRunning()) {
- intpA.cancel(context);
- }
- return true;
- }
-
- @Override
- public void setResult(Object results) {
- this.results = results;
- }
- };
-
- job2.setResult("result2");
-
- scheduler.submit(job1);
- scheduler.submit(job2);
-
-
- int cycles = 0;
- while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
- Thread.sleep(TICK_WAIT);
- cycles++;
- }
- assertTrue(job1.isRunning());
- assertTrue(job2.getStatus() == Status.PENDING);
-
- job2.abort();
-
- cycles = 0;
- while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
- Thread.sleep(TICK_WAIT);
- cycles++;
- }
-
- assertNotNull(job1.getDateFinished());
- assertTrue(job1.isTerminated());
- assertNull(job2.getDateFinished());
- assertTrue(job2.isTerminated());
- assertEquals("result2", job2.getReturn());
-
- intpA.close();
- schedulerSvc.removeScheduler("test");
- }
-
- @Override
- public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
-
- }
-
- @Override
- public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
-
- }
-
- @Override
- public void onOutputClear(String noteId, String paragraphId) {
-
- }
-
- @Override
- public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
-
- }
-
- @Override
- public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
- if (callback != null) {
- callback.onFinished(new LinkedList<>());
- }
- }
-
- @Override
- public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
- }
-
- @Override
- public void onParaInfosReceived(String noteId, String paragraphId,
- String interpreterSettingId, Map<String, String> metaInfos) {
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
new file mode 100644
index 0000000..0ac7116
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.List;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ * Proxy for AngularObjectRegistry that exists in remote interpreter process
+ */
+public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
+ Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
+ private InterpreterGroup interpreterGroup;
+
+ public RemoteAngularObjectRegistry(String interpreterId,
+ AngularObjectRegistryListener listener,
+ InterpreterGroup interpreterGroup) {
+ super(interpreterId, listener);
+ this.interpreterGroup = interpreterGroup;
+ }
+
+ private RemoteInterpreterProcess getRemoteInterpreterProcess() {
+ return interpreterGroup.getRemoteInterpreterProcess();
+ }
+
+ /**
+ * When ZeppelinServer side code want to add angularObject to the registry,
+ * this method should be used instead of add()
+ * @param name
+ * @param o
+ * @param noteId
+ * @return
+ */
+ public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String
+ paragraphId) {
+ Gson gson = new Gson();
+ RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
+ if (!remoteInterpreterProcess.isRunning()) {
+ return super.add(name, o, noteId, paragraphId, true);
+ }
+
+ Client client = null;
+ boolean broken = false;
+ try {
+ client = remoteInterpreterProcess.getClient();
+ client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
+ return super.add(name, o, noteId, paragraphId, true);
+ } catch (TException e) {
+ broken = true;
+ logger.error("Error", e);
+ } catch (Exception e) {
+ logger.error("Error", e);
+ } finally {
+ if (client != null) {
+ remoteInterpreterProcess.releaseClient(client, broken);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * When ZeppelinServer side code want to remove angularObject from the registry,
+ * this method should be used instead of remove()
+ * @param name
+ * @param noteId
+ * @param paragraphId
+ * @return
+ */
+ public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String
+ paragraphId) {
+ RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
+ return super.remove(name, noteId, paragraphId);
+ }
+
+ Client client = null;
+ boolean broken = false;
+ try {
+ client = remoteInterpreterProcess.getClient();
+ client.angularObjectRemove(name, noteId, paragraphId);
+ return super.remove(name, noteId, paragraphId);
+ } catch (TException e) {
+ broken = true;
+ logger.error("Error", e);
+ } catch (Exception e) {
+ logger.error("Error", e);
+ } finally {
+ if (client != null) {
+ remoteInterpreterProcess.releaseClient(client, broken);
+ }
+ }
+ return null;
+ }
+
+ public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
+ List<AngularObject> all = getAll(noteId, paragraphId);
+ for (AngularObject ao : all) {
+ removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId);
+ }
+ }
+
+ @Override
+ protected AngularObject createNewAngularObject(String name, Object o, String noteId, String
+ paragraphId) {
+ return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup,
+ getAngularObjectListener());
+ }
+}