You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/05/06 13:46:41 UTC

[1/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine

Repository: zeppelin
Updated Branches:
  refs/heads/master 8e96d8bd7 -> d9c4a5f0b


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
new file mode 100644
index 0000000..2ba7a76
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+public class RemoteInterpreterTest {
+
+
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+
+  private InterpreterGroup intpGroup;
+  private HashMap<String, String> env;
+
+  @Before
+  public void setUp() throws Exception {
+    intpGroup = new InterpreterGroup();
+    env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    intpGroup.close();
+  }
+
+  private RemoteInterpreter createMockInterpreterA(Properties p) {
+    return createMockInterpreterA(p, "note");
+  }
+
+  private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) {
+    return new RemoteInterpreter(
+        p,
+        noteId,
+        MockInterpreterA.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+  }
+
+  private RemoteInterpreter createMockInterpreterB(Properties p) {
+    return createMockInterpreterB(p, "note");
+  }
+
+  private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) {
+    return new RemoteInterpreter(
+        p,
+        noteId,
+        MockInterpreterB.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+  }
+
+  @Test
+  public void testRemoteInterperterCall() throws TTransportException, IOException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpB = createMockInterpreterB(p);
+
+    intpGroup.get("note").add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+
+    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+    process.equals(intpB.getInterpreterProcess());
+
+    assertFalse(process.isRunning());
+    assertEquals(0, process.getNumIdleClient());
+    assertEquals(0, process.referenceCount());
+
+    intpA.open(); // initializa all interpreters in the same group
+    assertTrue(process.isRunning());
+    assertEquals(1, process.getNumIdleClient());
+    assertEquals(1, process.referenceCount());
+
+    intpA.interpret("1",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+
+    intpB.open();
+    assertEquals(1, process.referenceCount());
+
+    intpA.close();
+    assertEquals(0, process.referenceCount());
+    intpB.close();
+    assertEquals(0, process.referenceCount());
+
+    assertFalse(process.isRunning());
+
+  }
+
+  @Test
+  public void testExecuteIncorrectPrecode() throws TTransportException, IOException {
+    Properties p = new Properties();
+    p.put("zeppelin.MockInterpreterA.precode", "fail test");
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+
+    intpA.open();
+    
+    InterpreterResult result = intpA.interpret("1",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+
+
+
+    intpA.close();
+    assertEquals(Code.ERROR, result.code());
+  }
+
+  @Test
+  public void testExecuteCorrectPrecode() throws TTransportException, IOException {
+    Properties p = new Properties();
+    p.put("zeppelin.MockInterpreterA.precode", "2");
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+
+    intpA.open();
+
+    InterpreterResult result = intpA.interpret("1",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+
+
+
+    intpA.close();
+    assertEquals(Code.SUCCESS, result.code());
+    assertEquals("1", result.message().get(0).getData());
+  }
+
+  @Test
+  public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
+    Properties p = new Properties();
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    InterpreterResult ret = intpA.interpret("non numeric value",
+        new InterpreterContext(
+            "noteId",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+
+    assertEquals(Code.ERROR, ret.code());
+  }
+
+  @Test
+  public void testRemoteSchedulerSharing() throws TTransportException, IOException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterA.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpB = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterB.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.get("note").add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    intpB.open();
+
+    long start = System.currentTimeMillis();
+    InterpreterResult ret = intpA.interpret("500",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+    assertEquals("500", ret.message().get(0).getData());
+
+    ret = intpB.interpret("500",
+        new InterpreterContext(
+            "note",
+            "id",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+    assertEquals("1000", ret.message().get(0).getData());
+    long end = System.currentTimeMillis();
+    assertTrue(end - start >= 1000);
+
+
+    intpA.close();
+    intpB.close();
+  }
+
+  @Test
+  public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    final RemoteInterpreter intpB = createMockInterpreterB(p);
+
+    intpGroup.get("note").add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    intpB.open();
+
+    long start = System.currentTimeMillis();
+    Job jobA = new Job("jobA", null) {
+      private Object r;
+
+      @Override
+      public Object getReturn() {
+        return r;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.r = results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        return intpA.interpret("500",
+            new InterpreterContext(
+                "note",
+                "jobA",
+                null,
+                "title",
+                "text",
+                new AuthenticationInfo(),
+                new HashMap<String, Object>(),
+                new GUI(),
+                new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
+                new LinkedList<InterpreterContextRunner>(), null));
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    };
+    intpA.getScheduler().submit(jobA);
+
+    Job jobB = new Job("jobB", null) {
+
+      private Object r;
+
+      @Override
+      public Object getReturn() {
+        return r;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.r = results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        return intpB.interpret("500",
+            new InterpreterContext(
+                "note",
+                "jobB",
+                null,
+                "title",
+                "text",
+                new AuthenticationInfo(),
+                new HashMap<String, Object>(),
+                new GUI(),
+                new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
+                new LinkedList<InterpreterContextRunner>(), null));
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    };
+    intpB.getScheduler().submit(jobB);
+    // wait until both job finished
+    while (jobA.getStatus() != Status.FINISHED ||
+           jobB.getStatus() != Status.FINISHED) {
+      Thread.sleep(100);
+    }
+    long end = System.currentTimeMillis();
+    assertTrue(end - start >= 1000);
+
+    assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message().get(0).getData());
+
+    intpA.close();
+    intpB.close();
+  }
+
+  @Test
+  public void testRunOrderPreserved() throws InterruptedException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    int concurrency = 3;
+    final List<InterpreterResultMessage> results = new LinkedList<>();
+
+    Scheduler scheduler = intpA.getScheduler();
+    for (int i = 0; i < concurrency; i++) {
+      final String jobId = Integer.toString(i);
+      scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
+        private Object r;
+
+        @Override
+        public Object getReturn() {
+          return r;
+        }
+
+        @Override
+        public void setResult(Object results) {
+          this.r = results;
+        }
+
+        @Override
+        public int progress() {
+          return 0;
+        }
+
+        @Override
+        public Map<String, Object> info() {
+          return null;
+        }
+
+        @Override
+        protected Object jobRun() throws Throwable {
+          InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext(
+              "note",
+              jobId,
+              null,
+              "title",
+              "text",
+              new AuthenticationInfo(),
+              new HashMap<String, Object>(),
+              new GUI(),
+              new AngularObjectRegistry(intpGroup.getId(), null),
+              new LocalResourcePool("pool1"),
+              new LinkedList<InterpreterContextRunner>(), null));
+
+          synchronized (results) {
+            results.addAll(ret.message());
+            results.notify();
+          }
+          return null;
+        }
+
+        @Override
+        protected boolean jobAbort() {
+          return false;
+        }
+
+      });
+    }
+
+    // wait for job finished
+    synchronized (results) {
+      while (results.size() != concurrency) {
+        results.wait(300);
+      }
+    }
+
+    int i = 0;
+    for (InterpreterResultMessage result : results) {
+      assertEquals(Integer.toString(i++), result.getData());
+    }
+    assertEquals(concurrency, i);
+
+    intpA.close();
+  }
+
+
+  @Test
+  public void testRunParallel() throws InterruptedException {
+    Properties p = new Properties();
+    p.put("parallel", "true");
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    int concurrency = 4;
+    final int timeToSleep = 1000;
+    final List<InterpreterResultMessage> results = new LinkedList<>();
+    long start = System.currentTimeMillis();
+
+    Scheduler scheduler = intpA.getScheduler();
+    for (int i = 0; i < concurrency; i++) {
+      final String jobId = Integer.toString(i);
+      scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
+        private Object r;
+
+        @Override
+        public Object getReturn() {
+          return r;
+        }
+
+        @Override
+        public void setResult(Object results) {
+          this.r = results;
+        }
+
+        @Override
+        public int progress() {
+          return 0;
+        }
+
+        @Override
+        public Map<String, Object> info() {
+          return null;
+        }
+
+        @Override
+        protected Object jobRun() throws Throwable {
+          String stmt = Integer.toString(timeToSleep);
+          InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
+              "note",
+              jobId,
+              null,
+              "title",
+              "text",
+              new AuthenticationInfo(),
+              new HashMap<String, Object>(),
+              new GUI(),
+              new AngularObjectRegistry(intpGroup.getId(), null),
+              new LocalResourcePool("pool1"),
+              new LinkedList<InterpreterContextRunner>(), null));
+
+          synchronized (results) {
+            results.addAll(ret.message());
+            results.notify();
+          }
+          return stmt;
+        }
+
+        @Override
+        protected boolean jobAbort() {
+          return false;
+        }
+
+      });
+    }
+
+    // wait for job finished
+    synchronized (results) {
+      while (results.size() != concurrency) {
+        results.wait(300);
+      }
+    }
+
+    long end = System.currentTimeMillis();
+
+    assertTrue(end - start < timeToSleep * concurrency);
+
+    intpA.close();
+  }
+
+  @Test
+  public void testInterpreterGroupResetBeforeProcessStarts() {
+    Properties p = new Properties();
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpA.setInterpreterGroup(intpGroup);
+    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+
+    intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
+    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
+
+    assertNotSame(processA.hashCode(), processB.hashCode());
+  }
+
+  @Test
+  public void testInterpreterGroupResetAfterProcessFinished() {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpA.setInterpreterGroup(intpGroup);
+    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+    intpA.open();
+
+    processA.dereference();    // intpA.close();
+
+    intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
+    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
+
+    assertNotSame(processA.hashCode(), processB.hashCode());
+  }
+
+  @Test
+  public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    final RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    Job jobA = new Job("jobA", null) {
+      private Object r;
+
+      @Override
+      public Object getReturn() {
+        return r;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.r = results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        return intpA.interpret("2000",
+            new InterpreterContext(
+                "note",
+                "jobA",
+                null,
+                "title",
+                "text",
+                new AuthenticationInfo(),
+                new HashMap<String, Object>(),
+                new GUI(),
+                new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
+                new LinkedList<InterpreterContextRunner>(), null));
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    };
+    intpA.getScheduler().submit(jobA);
+
+    // wait for job started
+    while (intpA.getScheduler().getJobsRunning().size() == 0) {
+      Thread.sleep(100);
+    }
+
+    // restart interpreter
+    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+    intpA.close();
+
+    InterpreterGroup newInterpreterGroup =
+        new InterpreterGroup(intpA.getInterpreterGroup().getId());
+    newInterpreterGroup.put("note", new LinkedList<Interpreter>());
+
+    intpA.setInterpreterGroup(newInterpreterGroup);
+    intpA.open();
+    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
+
+    assertNotSame(processA.hashCode(), processB.hashCode());
+
+  }
+
+  @Test
+  public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
+    Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpA = createMockInterpreterA(p);
+
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpB = createMockInterpreterB(p);
+
+    intpGroup.get("note").add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    intpB.open();
+
+    assertEquals(intpA.getScheduler(), intpB.getScheduler());
+  }
+
+  @Test
+  public void testMultiInterpreterSession() {
+    Properties p = new Properties();
+    intpGroup.put("sessionA", new LinkedList<Interpreter>());
+    intpGroup.put("sessionB", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
+    intpGroup.get("sessionA").add(intpAsessionA);
+    intpAsessionA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
+    intpGroup.get("sessionA").add(intpBsessionA);
+    intpBsessionA.setInterpreterGroup(intpGroup);
+
+    intpAsessionA.open();
+    intpBsessionA.open();
+
+    assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
+
+    RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
+    intpGroup.get("sessionB").add(intpAsessionB);
+    intpAsessionB.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
+    intpGroup.get("sessionB").add(intpBsessionB);
+    intpBsessionB.setInterpreterGroup(intpGroup);
+
+    intpAsessionB.open();
+    intpBsessionB.open();
+
+    assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
+    assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
+  }
+
+  @Test
+  public void should_push_local_angular_repo_to_remote() throws Exception {
+    //Given
+    final Client client = Mockito.mock(Client.class);
+    final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
+        MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null,
+        null, "anonymous", false);
+    final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
+    registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
+    final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
+    interpreterGroup.setAngularObjectRegistry(registry);
+    intr.setInterpreterGroup(interpreterGroup);
+
+    final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+                Map<String, AngularObject>>>() {}.getType();
+    final Gson gson = new Gson();
+    final String expected = gson.toJson(registry.getRegistry(), registryType);
+
+    //When
+    intr.pushAngularObjectRegistryToRemote(client);
+
+    //Then
+    Mockito.verify(client).angularRegistryPush(expected);
+  }
+
+  @Test
+  public void testEnvStringPattern() {
+    assertFalse(RemoteInterpreterUtils.isEnvString(null));
+    assertFalse(RemoteInterpreterUtils.isEnvString(""));
+    assertFalse(RemoteInterpreterUtils.isEnvString("abcDEF"));
+    assertFalse(RemoteInterpreterUtils.isEnvString("ABC-DEF"));
+    assertTrue(RemoteInterpreterUtils.isEnvString("ABCDEF"));
+    assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF"));
+    assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF123"));
+  }
+
+  @Test
+  public void testEnvronmentAndPropertySet() {
+    Properties p = new Properties();
+    p.setProperty("MY_ENV1", "env value 1");
+    p.setProperty("my.property.1", "property value 1");
+
+    RemoteInterpreter intp = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterEnv.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intp);
+    intp.setInterpreterGroup(intpGroup);
+
+    intp.open();
+
+    InterpreterContext context = new InterpreterContext(
+        "noteId",
+        "id",
+        null,
+        "title",
+        "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LocalResourcePool("pool1"),
+        new LinkedList<InterpreterContextRunner>(), null);
+
+
+    assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData());
+    assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code());
+    assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code());
+    assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData());
+
+    intp.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
new file mode 100644
index 0000000..975d6ea
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.Test;
+
+public class RemoteInterpreterUtilsTest {
+
+  @Test
+  public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
+    assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
new file mode 100644
index 0000000..81a9164
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+public class MockInterpreterA extends Interpreter {
+
+  private String lastSt;
+
+  public MockInterpreterA(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    //new RuntimeException().printStackTrace();
+  }
+
+  @Override
+  public void close() {
+  }
+
+  public String getLastStatement() {
+    return lastSt;
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    try {
+      Thread.sleep(Long.parseLong(st));
+      this.lastSt = st;
+    } catch (NumberFormatException | InterruptedException e) {
+      throw new InterpreterException(e);
+    }
+    return new InterpreterResult(Code.SUCCESS, st);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    if (getProperty("parallel") != null && getProperty("parallel").equals("true")) {
+      return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10);
+    } else {
+      return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
new file mode 100644
index 0000000..d4b26ad
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+
+public class MockInterpreterAngular extends Interpreter {
+
+  AtomicInteger numWatch = new AtomicInteger(0);
+
+  public MockInterpreterAngular(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] stmt = st.split(" ");
+    String cmd = stmt[0];
+    String name = null;
+    if (stmt.length >= 2) {
+      name = stmt[1];
+    }
+    String value = null;
+    if (stmt.length == 3) {
+      value = stmt[2];
+    }
+
+    AngularObjectRegistry registry = context.getAngularObjectRegistry();
+
+    if (cmd.equals("add")) {
+      registry.add(name, value, context.getNoteId(), null);
+      registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher
+              (null) {
+
+        @Override
+        public void watch(Object oldObject, Object newObject,
+            InterpreterContext context) {
+          numWatch.incrementAndGet();
+        }
+
+      });
+    } else if (cmd.equalsIgnoreCase("update")) {
+      registry.get(name, context.getNoteId(), null).set(value);
+    } else if (cmd.equals("remove")) {
+      registry.remove(name, context.getNoteId(), null);
+    }
+
+    try {
+      Thread.sleep(500); // wait for watcher executed
+    } catch (InterruptedException e) {
+      logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e);
+    }
+
+    String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch
+            .get());
+    return new InterpreterResult(Code.SUCCESS, msg);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
new file mode 100644
index 0000000..7103335
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+
+public class MockInterpreterB extends Interpreter {
+
+  public MockInterpreterB(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    //new RuntimeException().printStackTrace();
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    MockInterpreterA intpA = getInterpreterA();
+    String intpASt = intpA.getLastStatement();
+    long timeToSleep = Long.parseLong(st);
+    if (intpASt != null) {
+      timeToSleep += Long.parseLong(intpASt);
+    }
+    try {
+      Thread.sleep(timeToSleep);
+    } catch (NumberFormatException | InterruptedException e) {
+      throw new InterpreterException(e);
+    }
+    return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+
+  public MockInterpreterA getInterpreterA() {
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+    synchronized (interpreterGroup) {
+      for (List<Interpreter> interpreters : interpreterGroup.values()) {
+        boolean belongsToSameNoteGroup = false;
+        MockInterpreterA a = null;
+        for (Interpreter intp : interpreters) {
+          if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
+            Interpreter p = intp;
+            while (p instanceof WrappedInterpreter) {
+              p = ((WrappedInterpreter) p).getInnerInterpreter();
+            }
+            a = (MockInterpreterA) p;
+          }
+
+          Interpreter p = intp;
+          while (p instanceof WrappedInterpreter) {
+            p = ((WrappedInterpreter) p).getInnerInterpreter();
+          }
+          if (this == p) {
+            belongsToSameNoteGroup = true;
+          }
+        }
+        if (belongsToSameNoteGroup) {
+          return a;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    MockInterpreterA intpA = getInterpreterA();
+    if (intpA != null) {
+      return intpA.getScheduler();
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
new file mode 100644
index 0000000..12e11f7
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+
+public class MockInterpreterEnv extends Interpreter {
+
+  public MockInterpreterEnv(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] cmd = st.split(" ");
+    if (cmd[0].equals("getEnv")) {
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1]));
+    } else if (cmd[0].equals("getProperty")){
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1]));
+    } else {
+      return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
new file mode 100644
index 0000000..349315c
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MockInterpreter to test outputstream
+ */
+public class MockInterpreterOutputStream extends Interpreter {
+  private String lastSt;
+
+  public MockInterpreterOutputStream(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    //new RuntimeException().printStackTrace();
+  }
+
+  @Override
+  public void close() {
+  }
+
+  public String getLastStatement() {
+    return lastSt;
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] ret = st.split(":");
+    try {
+      if (ret[1] != null) {
+        context.out.write(ret[1]);
+      }
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
+    return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ?
+            ret[2] : "");
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
new file mode 100644
index 0000000..c4ff6ab
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+
+public class MockInterpreterResourcePool extends Interpreter {
+
+  AtomicInteger numWatch = new AtomicInteger(0);
+
+  public MockInterpreterResourcePool(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] stmt = st.split(" ");
+    String cmd = stmt[0];
+    String noteId = null;
+    String paragraphId = null;
+    String name = null;
+    if (stmt.length >= 2) {
+      String[] npn = stmt[1].split(":");
+      if (npn.length >= 3) {
+        noteId = npn[0];
+        paragraphId = npn[1];
+        name = npn[2];
+      } else {
+        name = stmt[1];
+      }
+    }
+    String value = null;
+    if (stmt.length >= 3) {
+      value = stmt[2];
+    }
+
+    ResourcePool resourcePool = context.getResourcePool();
+    Object ret = null;
+    if (cmd.equals("put")) {
+      resourcePool.put(noteId, paragraphId, name, value);
+    } else if (cmd.equalsIgnoreCase("get")) {
+      Resource resource = resourcePool.get(noteId, paragraphId, name);
+      if (resource != null) {
+        ret = resourcePool.get(noteId, paragraphId, name).get();
+      } else {
+        ret = "";
+      }
+    } else if (cmd.equals("remove")) {
+      ret = resourcePool.remove(noteId, paragraphId, name);
+    } else if (cmd.equals("getAll")) {
+      ret = resourcePool.getAll();
+    } else if (cmd.equals("invoke")) {
+      Resource resource = resourcePool.get(noteId, paragraphId, name);
+      if (stmt.length >=4) {
+        Resource res = resource.invokeMethod(value, null, null, stmt[3]);
+        ret = res.get();
+      } else {
+        ret = resource.invokeMethod(value, null, null);
+      }
+    }
+
+    try {
+      Thread.sleep(500); // wait for watcher executed
+    } catch (InterruptedException e) {
+    }
+
+    Gson gson = new Gson();
+    return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
new file mode 100644
index 0000000..363ccf6
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unittest for DistributedResourcePool
+ */
+public class DistributedResourcePoolTest {
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+  private InterpreterGroup intpGroup1;
+  private InterpreterGroup intpGroup2;
+  private HashMap<String, String> env;
+  private RemoteInterpreter intp1;
+  private RemoteInterpreter intp2;
+  private InterpreterContext context;
+  private RemoteInterpreterEventPoller eventPoller1;
+  private RemoteInterpreterEventPoller eventPoller2;
+
+
+  @Before
+  public void setUp() throws Exception {
+    env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+    Properties p = new Properties();
+
+    intp1 = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterResourcePool.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false
+    );
+
+    intpGroup1 = new InterpreterGroup("intpGroup1");
+    intpGroup1.put("note", new LinkedList<Interpreter>());
+    intpGroup1.get("note").add(intp1);
+    intp1.setInterpreterGroup(intpGroup1);
+
+    intp2 = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterResourcePool.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",        
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false
+    );
+
+    intpGroup2 = new InterpreterGroup("intpGroup2");
+    intpGroup2.put("note", new LinkedList<Interpreter>());
+    intpGroup2.get("note").add(intp2);
+    intp2.setInterpreterGroup(intpGroup2);
+
+    context = new InterpreterContext(
+        "note",
+        "id",
+        null,
+        "title",
+        "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        null,
+        null,
+        new LinkedList<InterpreterContextRunner>(),
+        null);
+
+    intp1.open();
+    intp2.open();
+
+    eventPoller1 = new RemoteInterpreterEventPoller(null, null);
+    eventPoller1.setInterpreterGroup(intpGroup1);
+    eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
+
+    eventPoller2 = new RemoteInterpreterEventPoller(null, null);
+    eventPoller2.setInterpreterGroup(intpGroup2);
+    eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
+
+    eventPoller1.start();
+    eventPoller2.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    eventPoller1.shutdown();
+    intp1.close();
+    intpGroup1.close();
+    eventPoller2.shutdown();
+    intp2.close();
+    intpGroup2.close();
+  }
+
+  @Test
+  public void testRemoteDistributedResourcePool() {
+    Gson gson = new Gson();
+    InterpreterResult ret;
+    intp1.interpret("put key1 value1", context);
+    intp2.interpret("put key2 value2", context);
+
+    ret = intp1.interpret("getAll", context);
+    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+    ret = intp2.interpret("getAll", context);
+    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+    ret = intp1.interpret("get key1", context);
+    assertEquals("value1", gson.fromJson(ret.message().get(0).getData(), String.class));
+
+    ret = intp1.interpret("get key2", context);
+    assertEquals("value2", gson.fromJson(ret.message().get(0).getData(), String.class));
+  }
+
+  @Test
+  public void testDistributedResourcePool() {
+    final LocalResourcePool pool2 = new LocalResourcePool("pool2");
+    final LocalResourcePool pool3 = new LocalResourcePool("pool3");
+
+    DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new ResourcePoolConnector() {
+      @Override
+      public ResourceSet getAllResources() {
+        ResourceSet set = pool2.getAll();
+        set.addAll(pool3.getAll());
+
+        ResourceSet remoteSet = new ResourceSet();
+        Gson gson = new Gson();
+        for (Resource s : set) {
+          RemoteResource remoteResource = gson.fromJson(gson.toJson(s), RemoteResource.class);
+          remoteResource.setResourcePoolConnector(this);
+          remoteSet.add(remoteResource);
+        }
+        return remoteSet;
+      }
+
+      @Override
+      public Object readResource(ResourceId id) {
+        if (id.getResourcePoolId().equals(pool2.id())) {
+          return pool2.get(id.getName()).get();
+        }
+        if (id.getResourcePoolId().equals(pool3.id())) {
+          return pool3.get(id.getName()).get();
+        }
+        return null;
+      }
+
+      @Override
+      public Object invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] params) {
+        return null;
+      }
+
+      @Override
+      public Resource invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[]
+          params, String returnResourceName) {
+        return null;
+      }
+    });
+
+    assertEquals(0, pool1.getAll().size());
+
+
+    // test get() can get from pool
+    pool2.put("object1", "value2");
+    assertEquals(1, pool1.getAll().size());
+    assertTrue(pool1.get("object1").isRemote());
+    assertEquals("value2", pool1.get("object1").get());
+
+    // test get() is locality aware
+    pool1.put("object1", "value1");
+    assertEquals(1, pool2.getAll().size());
+    assertEquals("value1", pool1.get("object1").get());
+
+    // test getAll() is locality aware
+    assertEquals("value1", pool1.getAll().get(0).get());
+    assertEquals("value2", pool1.getAll().get(1).get());
+  }
+
+  @Test
+  public void testResourcePoolUtils() {
+    Gson gson = new Gson();
+    InterpreterResult ret;
+
+    // when create some resources
+    intp1.interpret("put note1:paragraph1:key1 value1", context);
+    intp1.interpret("put note1:paragraph2:key1 value2", context);
+    intp2.interpret("put note2:paragraph1:key1 value1", context);
+    intp2.interpret("put note2:paragraph2:key2 value2", context);
+
+
+    // then get all resources.
+    assertEquals(4, ResourcePoolUtils.getAllResources().size());
+
+    // when remove all resources from note1
+    ResourcePoolUtils.removeResourcesBelongsToNote("note1");
+
+    // then resources should be removed.
+    assertEquals(2, ResourcePoolUtils.getAllResources().size());
+    assertEquals("", gson.fromJson(
+        intp1.interpret("get note1:paragraph1:key1", context).message().get(0).getData(),
+        String.class));
+    assertEquals("", gson.fromJson(
+        intp1.interpret("get note1:paragraph2:key1", context).message().get(0).getData(),
+        String.class));
+
+
+    // when remove all resources from note2:paragraph1
+    ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
+
+    // then 1
+    assertEquals(1, ResourcePoolUtils.getAllResources().size());
+    assertEquals("value2", gson.fromJson(
+        intp1.interpret("get note2:paragraph2:key2", context).message().get(0).getData(),
+        String.class));
+
+  }
+
+  @Test
+  public void testResourceInvokeMethod() {
+    Gson gson = new Gson();
+    InterpreterResult ret;
+    intp1.interpret("put key1 hey", context);
+    intp2.interpret("put key2 world", context);
+
+    // invoke method in local resource pool
+    ret = intp1.interpret("invoke key1 length", context);
+    assertEquals("3", ret.message().get(0).getData());
+
+    // invoke method in remote resource pool
+    ret = intp1.interpret("invoke key2 length", context);
+    assertEquals("5", ret.message().get(0).getData());
+
+    // make sure no resources are automatically created
+    ret = intp1.interpret("getAll", context);
+    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+    // invoke method in local resource pool and save result
+    ret = intp1.interpret("invoke key1 length ret1", context);
+    assertEquals("3", ret.message().get(0).getData());
+
+    ret = intp1.interpret("getAll", context);
+    assertEquals(3, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+    ret = intp1.interpret("get ret1", context);
+    assertEquals("3", gson.fromJson(ret.message().get(0).getData(), String.class));
+
+    // invoke method in remote resource pool and save result
+    ret = intp1.interpret("invoke key2 length ret2", context);
+    assertEquals("5", ret.message().get(0).getData());
+
+    ret = intp1.interpret("getAll", context);
+    assertEquals(4, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
+
+    ret = intp1.interpret("get ret2", context);
+    assertEquals("5", gson.fromJson(ret.message().get(0).getData(), String.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
new file mode 100644
index 0000000..ebb5100
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
+
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+  private SchedulerFactory schedulerSvc;
+  private static final int TICK_WAIT = 100;
+  private static final int MAX_WAIT_CYCLES = 100;
+
+  @Before
+  public void setUp() throws Exception{
+    schedulerSvc = new SchedulerFactory();
+  }
+
+  @After
+  public void tearDown(){
+
+  }
+
+  @Test
+  public void test() throws Exception {
+    Properties p = new Properties();
+    final InterpreterGroup intpGroup = new InterpreterGroup();
+    Map<String, String> env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+    final RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterA.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        this,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
+        intpA.getInterpreterProcess(),
+        10);
+
+    Job job = new Job("jobId", "jobName", null, 200) {
+      Object results;
+      @Override
+      public Object getReturn() {
+        return results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        intpA.interpret("1000", new InterpreterContext(
+            "note",
+            "jobId",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+        return "1000";
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.results = results;
+      }
+    };
+    scheduler.submit(job);
+
+    int cycles = 0;
+    while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+    assertTrue(job.isRunning());
+
+    Thread.sleep(5*TICK_WAIT);
+    assertEquals(0, scheduler.getJobsWaiting().size());
+    assertEquals(1, scheduler.getJobsRunning().size());
+
+    cycles = 0;
+    while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+
+    assertTrue(job.isTerminated());
+    assertEquals(0, scheduler.getJobsWaiting().size());
+    assertEquals(0, scheduler.getJobsRunning().size());
+
+    intpA.close();
+    schedulerSvc.removeScheduler("test");
+  }
+
+  @Test
+  public void testAbortOnPending() throws Exception {
+    Properties p = new Properties();
+    final InterpreterGroup intpGroup = new InterpreterGroup();
+    Map<String, String> env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+    final RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterA.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        this,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
+        intpA.getInterpreterProcess(),
+        10);
+
+    Job job1 = new Job("jobId1", "jobName1", null, 200) {
+      Object results;
+      InterpreterContext context = new InterpreterContext(
+          "note",
+          "jobId1",
+          null,
+          "title",
+          "text",
+          new AuthenticationInfo(),
+          new HashMap<String, Object>(),
+          new GUI(),
+          new AngularObjectRegistry(intpGroup.getId(), null),
+          new LocalResourcePool("pool1"),
+          new LinkedList<InterpreterContextRunner>(), null);
+
+      @Override
+      public Object getReturn() {
+        return results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        intpA.interpret("1000", context);
+        return "1000";
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        if (isRunning()) {
+          intpA.cancel(context);
+        }
+        return true;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.results = results;
+      }
+    };
+
+    Job job2 = new Job("jobId2", "jobName2", null, 200) {
+      public Object results;
+      InterpreterContext context = new InterpreterContext(
+          "note",
+          "jobId2",
+          null,
+          "title",
+          "text",
+          new AuthenticationInfo(),
+          new HashMap<String, Object>(),
+          new GUI(),
+          new AngularObjectRegistry(intpGroup.getId(), null),
+          new LocalResourcePool("pool1"),
+          new LinkedList<InterpreterContextRunner>(), null);
+
+      @Override
+      public Object getReturn() {
+        return results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        intpA.interpret("1000", context);
+        return "1000";
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        if (isRunning()) {
+          intpA.cancel(context);
+        }
+        return true;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.results = results;
+      }
+    };
+
+    job2.setResult("result2");
+
+    scheduler.submit(job1);
+    scheduler.submit(job2);
+
+
+    int cycles = 0;
+    while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+    assertTrue(job1.isRunning());
+    assertTrue(job2.getStatus() == Status.PENDING);
+
+    job2.abort();
+
+    cycles = 0;
+    while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+
+    assertNotNull(job1.getDateFinished());
+    assertTrue(job1.isTerminated());
+    assertNull(job2.getDateFinished());
+    assertTrue(job2.isTerminated());
+    assertEquals("result2", job2.getReturn());
+
+    intpA.close();
+    schedulerSvc.removeScheduler("test");
+  }
+
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
+
+  }
+
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
+
+  }
+
+  @Override
+  public void onOutputClear(String noteId, String paragraphId) {
+
+  }
+
+  @Override
+  public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
+
+  }
+
+  @Override
+  public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
+    if (callback != null) {
+      callback.onFinished(new LinkedList<>());
+    }
+  }
+
+  @Override
+  public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
+  }
+
+  @Override
+  public void onParaInfosReceived(String noteId, String paragraphId, 
+      String interpreterSettingId, Map<String, String> metaInfos) {
+  }
+}


[2/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine

Posted by jo...@apache.org.
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
new file mode 100644
index 0000000..ed8982b
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.*;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Proxy for Interpreter instance that runs on separate process
+ */
+public class RemoteInterpreter extends Interpreter {
+  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
+
+  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+  private final ApplicationEventListener applicationEventListener;
+  private Gson gson = new Gson();
+  private String interpreterRunner;
+  private String interpreterPath;
+  private String localRepoPath;
+  private String className;
+  private String sessionKey;
+  private FormType formType;
+  private boolean initialized;
+  private Map<String, String> env;
+  private int connectTimeout;
+  private int maxPoolSize;
+  private String host;
+  private int port;
+  private String userName;
+  private Boolean isUserImpersonate;
+  private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+  private String interpreterGroupName;
+
+  /**
+   * Remote interpreter and manage interpreter process
+   */
+  public RemoteInterpreter(Properties property, String sessionKey, String className,
+      String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
+      int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+      int outputLimit, String interpreterGroupName) {
+    super(property);
+    this.sessionKey = sessionKey;
+    this.className = className;
+    initialized = false;
+    this.interpreterRunner = interpreterRunner;
+    this.interpreterPath = interpreterPath;
+    this.localRepoPath = localRepoPath;
+    env = getEnvFromInterpreterProperty(property);
+    this.connectTimeout = connectTimeout;
+    this.maxPoolSize = maxPoolSize;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.applicationEventListener = appListener;
+    this.userName = userName;
+    this.isUserImpersonate = isUserImpersonate;
+    this.outputLimit = outputLimit;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+
+  /**
+   * Connect to existing process
+   */
+  public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
+      int port, String localRepoPath, int connectTimeout, int maxPoolSize,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+      int outputLimit) {
+    super(property);
+    this.sessionKey = sessionKey;
+    this.className = className;
+    initialized = false;
+    this.host = host;
+    this.port = port;
+    this.localRepoPath = localRepoPath;
+    this.connectTimeout = connectTimeout;
+    this.maxPoolSize = maxPoolSize;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.applicationEventListener = appListener;
+    this.userName = userName;
+    this.isUserImpersonate = isUserImpersonate;
+    this.outputLimit = outputLimit;
+  }
+
+
+  // VisibleForTesting
+  public RemoteInterpreter(Properties property, String sessionKey, String className,
+      String interpreterRunner, String interpreterPath, String localRepoPath,
+      Map<String, String> env, int connectTimeout,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
+    super(property);
+    this.className = className;
+    this.sessionKey = sessionKey;
+    this.interpreterRunner = interpreterRunner;
+    this.interpreterPath = interpreterPath;
+    this.localRepoPath = localRepoPath;
+    env.putAll(getEnvFromInterpreterProperty(property));
+    this.env = env;
+    this.connectTimeout = connectTimeout;
+    this.maxPoolSize = 10;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.applicationEventListener = appListener;
+    this.userName = userName;
+    this.isUserImpersonate = isUserImpersonate;
+  }
+
+  private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
+    Map<String, String> env = new HashMap<>();
+    for (Object key : property.keySet()) {
+      if (RemoteInterpreterUtils.isEnvString((String) key)) {
+        env.put((String) key, property.getProperty((String) key));
+      }
+    }
+    return env;
+  }
+
+  @Override
+  public String getClassName() {
+    return className;
+  }
+
+  private boolean connectToExistingProcess() {
+    return host != null && port > 0;
+  }
+
+  public RemoteInterpreterProcess getInterpreterProcess() {
+    InterpreterGroup intpGroup = getInterpreterGroup();
+    if (intpGroup == null) {
+      return null;
+    }
+
+    synchronized (intpGroup) {
+      if (intpGroup.getRemoteInterpreterProcess() == null) {
+        RemoteInterpreterProcess remoteProcess;
+        if (connectToExistingProcess()) {
+          remoteProcess = new RemoteInterpreterRunningProcess(
+              connectTimeout,
+              remoteInterpreterProcessListener,
+              applicationEventListener,
+              host,
+              port);
+        } else {
+          // create new remote process
+          remoteProcess = new RemoteInterpreterManagedProcess(
+              interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
+              remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
+        }
+
+        intpGroup.setRemoteInterpreterProcess(remoteProcess);
+      }
+
+      return intpGroup.getRemoteInterpreterProcess();
+    }
+  }
+
+  public synchronized void init() {
+    if (initialized == true) {
+      return;
+    }
+
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+
+    final InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+    interpreterProcess.setMaxPoolSize(
+        Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
+    String groupId = interpreterGroup.getId();
+
+    synchronized (interpreterProcess) {
+      Client client = null;
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e1) {
+        throw new InterpreterException(e1);
+      }
+
+      boolean broken = false;
+      try {
+        logger.info("Create remote interpreter {}", getClassName());
+        if (localRepoPath != null) {
+          property.put("zeppelin.interpreter.localRepo", localRepoPath);
+        }
+
+        property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
+        client.createInterpreter(groupId, sessionKey,
+            getClassName(), (Map) property, userName);
+        // Push angular object loaded from JSON file to remote interpreter
+        if (!interpreterGroup.isAngularRegistryPushed()) {
+          pushAngularObjectRegistryToRemote(client);
+          interpreterGroup.setAngularRegistryPushed(true);
+        }
+
+      } catch (TException e) {
+        logger.error("Failed to create interpreter: {}", getClassName());
+        throw new InterpreterException(e);
+      } finally {
+        // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
+        interpreterProcess.releaseClient(client, broken);
+      }
+    }
+    initialized = true;
+  }
+
+
+  @Override
+  public void open() {
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+    synchronized (interpreterGroup) {
+      // initialize all interpreters in this interpreter group
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+      // doesn't call open method, it's not open. It causes problem while running intp.close()
+      // In case of Spark, this method initializes all of interpreters and init() method increases
+      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+      // But for now, we have to initialise all of interpreters for some reasons.
+      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+      if (!initialized) {
+        // reference per session
+        interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
+      }
+      for (Interpreter intp : new ArrayList<>(interpreters)) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        try {
+          ((RemoteInterpreter) p).init();
+        } catch (InterpreterException e) {
+          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+              p.getClassName());
+          interpreters.remove(p);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+    synchronized (interpreterGroup) {
+      // close all interpreters in this session
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+      // doesn't call open method, it's not open. It causes problem while running intp.close()
+      // In case of Spark, this method initializes all of interpreters and init() method increases
+      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+      // But for now, we have to initialise all of interpreters for some reasons.
+      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      if (initialized) {
+        // dereference per session
+        getInterpreterProcess().dereference();
+      }
+      for (Interpreter intp : new ArrayList<>(interpreters)) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        try {
+          ((RemoteInterpreter) p).closeInterpreter();
+        } catch (InterpreterException e) {
+          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+              p.getClassName());
+          interpreters.remove(p);
+        }
+      }
+    }
+  }
+
+  public void closeInterpreter() {
+    if (this.initialized == false) {
+      return;
+    }
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = interpreterProcess.getClient();
+      if (client != null) {
+        client.close(sessionKey, className);
+      }
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
+      this.initialized = false;
+    }
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("st:\n{}", st);
+    }
+
+    FormType form = getFormType();
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
+        .getInterpreterContextRunnerPool();
+
+    List<InterpreterContextRunner> runners = context.getRunners();
+    if (runners != null && runners.size() != 0) {
+      // assume all runners in this InterpreterContext have the same note id
+      String noteId = runners.get(0).getNoteId();
+
+      interpreterContextRunnerPool.clear(noteId);
+      interpreterContextRunnerPool.addAll(noteId, runners);
+    }
+
+    boolean broken = false;
+    try {
+
+      final GUI currentGUI = context.getGui();
+      RemoteInterpreterResult remoteResult = client.interpret(
+          sessionKey, className, st, convert(context));
+
+      Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
+          remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
+          }.getType());
+      context.getConfig().clear();
+      context.getConfig().putAll(remoteConfig);
+
+      if (form == FormType.NATIVE) {
+        GUI remoteGui = GUI.fromJson(remoteResult.getGui());
+        currentGUI.clear();
+        currentGUI.setParams(remoteGui.getParams());
+        currentGUI.setForms(remoteGui.getForms());
+      } else if (form == FormType.SIMPLE) {
+        final Map<String, Input> currentForms = currentGUI.getForms();
+        final Map<String, Object> currentParams = currentGUI.getParams();
+        final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
+        final Map<String, Input> remoteForms = remoteGUI.getForms();
+        final Map<String, Object> remoteParams = remoteGUI.getParams();
+        currentForms.putAll(remoteForms);
+        currentParams.putAll(remoteParams);
+      }
+
+      InterpreterResult result = convert(remoteResult);
+      return result;
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      client.cancel(sessionKey, className, convert(context));
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public FormType getFormType() {
+    open();
+
+    if (formType != null) {
+      return formType;
+    }
+
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      formType = FormType.valueOf(client.getFormType(sessionKey, className));
+      return formType;
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    if (interpreterProcess == null || !interpreterProcess.isRunning()) {
+      return 0;
+    }
+
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      return client.getProgress(sessionKey, className, convert(context));
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      List completion = client.completion(sessionKey, className, buf, cursor,
+          convert(interpreterContext));
+      return completion;
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    int maxConcurrency = maxPoolSize;
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    if (interpreterProcess == null) {
+      return null;
+    } else {
+      return SchedulerFactory.singleton().createOrGetRemoteScheduler(
+          RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
+          sessionKey, interpreterProcess, maxConcurrency);
+    }
+  }
+
+  private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
+    return interpreterGroup.getId();
+  }
+
+  private RemoteInterpreterContext convert(InterpreterContext ic) {
+    return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
+        ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
+        gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
+  }
+
+  private InterpreterResult convert(RemoteInterpreterResult result) {
+    InterpreterResult r = new InterpreterResult(
+        InterpreterResult.Code.valueOf(result.getCode()));
+
+    for (RemoteInterpreterResultMessage m : result.getMsg()) {
+      r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
+    }
+
+    return r;
+  }
+
+  /**
+   * Push local angular object registry to
+   * remote interpreter. This method should be
+   * call ONLY inside the init() method
+   */
+  void pushAngularObjectRegistryToRemote(Client client) throws TException {
+    final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
+        .getAngularObjectRegistry();
+
+    if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
+      final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
+          .getRegistry();
+
+      logger.info("Push local angular object registry from ZeppelinServer to" +
+          " remote interpreter group {}", this.getInterpreterGroup().getId());
+
+      final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+          Map<String, AngularObject>>>() {
+      }.getType();
+
+      Gson gson = new Gson();
+      client.angularRegistryPush(gson.toJson(registry, registryType));
+    }
+  }
+
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  public void setEnv(Map<String, String> env) {
+    this.env = env;
+  }
+
+  public void addEnv(Map<String, String> env) {
+    if (this.env == null) {
+      this.env = new HashMap<>();
+    }
+    this.env.putAll(env);
+  }
+
+  //Only for test
+  public String getInterpreterRunner() {
+    return interpreterRunner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
new file mode 100644
index 0000000..1fb9b90
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.commons.exec.*;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * This class manages start / stop of remote interpreter process
+ */
+public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
+    implements ExecuteResultHandler {
+  private static final Logger logger = LoggerFactory.getLogger(
+      RemoteInterpreterManagedProcess.class);
+  private final String interpreterRunner;
+
+  private DefaultExecutor executor;
+  private ExecuteWatchdog watchdog;
+  boolean running = false;
+  private int port = -1;
+  private final String interpreterDir;
+  private final String localRepoDir;
+  private final String interpreterGroupName;
+
+  private Map<String, String> env;
+
+  public RemoteInterpreterManagedProcess(
+      String intpRunner,
+      String intpDir,
+      String localRepoDir,
+      Map<String, String> env,
+      int connectTimeout,
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener,
+      String interpreterGroupName) {
+    super(new RemoteInterpreterEventPoller(listener, appListener),
+        connectTimeout);
+    this.interpreterRunner = intpRunner;
+    this.env = env;
+    this.interpreterDir = intpDir;
+    this.localRepoDir = localRepoDir;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+  RemoteInterpreterManagedProcess(String intpRunner,
+                                  String intpDir,
+                                  String localRepoDir,
+                                  Map<String, String> env,
+                                  RemoteInterpreterEventPoller remoteInterpreterEventPoller,
+                                  int connectTimeout,
+                                  String interpreterGroupName) {
+    super(remoteInterpreterEventPoller,
+        connectTimeout);
+    this.interpreterRunner = intpRunner;
+    this.env = env;
+    this.interpreterDir = intpDir;
+    this.localRepoDir = localRepoDir;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+  @Override
+  public String getHost() {
+    return "localhost";
+  }
+
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void start(String userName, Boolean isUserImpersonate) {
+    // start server process
+    try {
+      port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+    } catch (IOException e1) {
+      throw new InterpreterException(e1);
+    }
+
+    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
+    cmdLine.addArgument("-d", false);
+    cmdLine.addArgument(interpreterDir, false);
+    cmdLine.addArgument("-p", false);
+    cmdLine.addArgument(Integer.toString(port), false);
+    if (isUserImpersonate && !userName.equals("anonymous")) {
+      cmdLine.addArgument("-u", false);
+      cmdLine.addArgument(userName, false);
+    }
+    cmdLine.addArgument("-l", false);
+    cmdLine.addArgument(localRepoDir, false);
+    cmdLine.addArgument("-g", false);
+    cmdLine.addArgument(interpreterGroupName, false);
+
+    executor = new DefaultExecutor();
+
+    ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
+    ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
+    processOutput.setOutputStream(cmdOut);
+
+    executor.setStreamHandler(new PumpStreamHandler(processOutput));
+    watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+    executor.setWatchdog(watchdog);
+
+    try {
+      Map procEnv = EnvironmentUtils.getProcEnvironment();
+      procEnv.putAll(env);
+
+      logger.info("Run interpreter process {}", cmdLine);
+      executor.execute(cmdLine, procEnv, this);
+      running = true;
+    } catch (IOException e) {
+      running = false;
+      throw new InterpreterException(e);
+    }
+
+
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
+      if (!running) {
+        try {
+          cmdOut.flush();
+        } catch (IOException e) {
+          // nothing to do
+        }
+        throw new InterpreterException(new String(cmdOut.toByteArray()));
+      }
+
+      try {
+        if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
+          break;
+        } else {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
+                    "Thread.sleep", e);
+          }
+        }
+      } catch (Exception e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Remote interpreter not yet accessible at localhost:" + port);
+        }
+      }
+    }
+    processOutput.setOutputStream(null);
+  }
+
+  public void stop() {
+    if (isRunning()) {
+      logger.info("kill interpreter process");
+      watchdog.destroyProcess();
+    }
+
+    executor = null;
+    watchdog = null;
+    running = false;
+    logger.info("Remote process terminated");
+  }
+
+  @Override
+  public void onProcessComplete(int exitValue) {
+    logger.info("Interpreter process exited {}", exitValue);
+    running = false;
+
+  }
+
+  @Override
+  public void onProcessFailed(ExecuteException e) {
+    logger.info("Interpreter process failed {}", e);
+    running = false;
+  }
+
+  public boolean isRunning() {
+    return running;
+  }
+
+  private static class ProcessLogOutputStream extends LogOutputStream {
+
+    private Logger logger;
+    OutputStream out;
+
+    public ProcessLogOutputStream(Logger logger) {
+      this.logger = logger;
+    }
+
+    @Override
+    protected void processLine(String s, int i) {
+      this.logger.debug(s);
+    }
+
+    @Override
+    public void write(byte [] b) throws IOException {
+      super.write(b);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void write(byte [] b, int offset, int len) throws IOException {
+      super.write(b, offset, len);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b, offset, len);
+          }
+        }
+      }
+    }
+
+    public void setOutputStream(OutputStream out) {
+      synchronized (this) {
+        this.out = out;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
new file mode 100644
index 0000000..bb176be
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class connects to existing process
+ */
+public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
+  private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
+  private final String host;
+  private final int port;
+
+  public RemoteInterpreterRunningProcess(
+      int connectTimeout,
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener,
+      String host,
+      int port
+  ) {
+    super(connectTimeout, listener, appListener);
+    this.host = host;
+    this.port = port;
+  }
+
+  @Override
+  public String getHost() {
+    return host;
+  }
+
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void start(String userName, Boolean isUserImpersonate) {
+    // assume process is externally managed. nothing to do
+  }
+
+  @Override
+  public void stop() {
+    // assume process is externally managed. nothing to do
+  }
+
+  @Override
+  public boolean isRunning() {
+    return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
new file mode 100644
index 0000000..c8c64ea
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class AppendOutputRunnerTest {
+
+  private static final int NUM_EVENTS = 10000;
+  private static final int NUM_CLUBBED_EVENTS = 100;
+  private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+  private static ScheduledFuture<?> future = null;
+  /* It is being accessed by multiple threads.
+   * While loop for 'loopForBufferCompletion' could
+   * run for-ever.
+   */
+  private volatile static int numInvocations = 0;
+
+  @After
+  public void afterEach() {
+    if (future != null) {
+      future.cancel(true);
+    }
+  }
+
+  @Test
+  public void testSingleEvent() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    String[][] buffer = {{"note", "para", "data\n"}};
+
+    loopForCompletingEvents(listener, 1, buffer);
+    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+    verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n");
+  }
+
+  @Test
+  public void testMultipleEventsOfSameParagraph() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    String note1 = "note1";
+    String para1 = "para1";
+    String[][] buffer = {
+        {note1, para1, "data1\n"},
+        {note1, para1, "data2\n"},
+        {note1, para1, "data3\n"}
+    };
+
+    loopForCompletingEvents(listener, 1, buffer);
+    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+    verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
+  }
+
+  @Test
+  public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    String note1 = "note1";
+    String note2 = "note2";
+    String para1 = "para1";
+    String para2 = "para2";
+    String[][] buffer = {
+        {note1, para1, "data1\n"},
+        {note1, para2, "data2\n"},
+        {note2, para1, "data3\n"},
+        {note2, para2, "data4\n"}
+    };
+    loopForCompletingEvents(listener, 4, buffer);
+
+    verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+    verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n");
+    verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n");
+    verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n");
+    verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n");
+  }
+
+  @Test
+  public void testClubbedData() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    future = service.scheduleWithFixedDelay(runner, 0,
+        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+    Thread thread = new Thread(new BombardEvents(runner));
+    thread.start();
+    thread.join();
+    Thread.sleep(1000);
+
+    /* NUM_CLUBBED_EVENTS is a heuristic number.
+     * It has been observed that for 10,000 continuos event
+     * calls, 30-40 Web-socket calls are made. Keeping
+     * the unit-test to a pessimistic 100 web-socket calls.
+     */
+    verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+  }
+
+  @Test
+  public void testWarnLoggerForLargeData() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    String data = "data\n";
+    int numEvents = 100000;
+
+    for (int i=0; i<numEvents; i++) {
+      runner.appendBuffer("noteId", "paraId", 0, data);
+    }
+
+    TestAppender appender = new TestAppender();
+    Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+    Logger.getLogger(RemoteInterpreterEventPoller.class);
+
+    runner.run();
+    List<LoggingEvent> log;
+
+    int warnLogCounter;
+    LoggingEvent sizeWarnLogEntry = null;
+    do {
+      warnLogCounter = 0;
+      log = appender.getLog();
+      for (LoggingEvent logEntry: log) {
+        if (Level.WARN.equals(logEntry.getLevel())) {
+          sizeWarnLogEntry = logEntry;
+          warnLogCounter += 1;
+        }
+      }
+    } while(warnLogCounter != 2);
+
+    String loggerString = "Processing size for buffered append-output is high: " +
+        (data.length() * numEvents) + " characters.";
+    assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
+  }
+
+  private class BombardEvents implements Runnable {
+
+    private final AppendOutputRunner runner;
+
+    private BombardEvents(AppendOutputRunner runner) {
+      this.runner = runner;
+    }
+
+    @Override
+    public void run() {
+      String noteId = "noteId";
+      String paraId = "paraId";
+      for (int i=0; i<NUM_EVENTS; i++) {
+        runner.appendBuffer(noteId, paraId, 0, "data\n");
+      }
+    }
+  }
+
+  private class TestAppender extends AppenderSkeleton {
+    private final List<LoggingEvent> log = new ArrayList<>();
+
+    @Override
+    public boolean requiresLayout() {
+        return false;
+    }
+
+    @Override
+    protected void append(final LoggingEvent loggingEvent) {
+        log.add(loggingEvent);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public List<LoggingEvent> getLog() {
+        return new ArrayList<>(log);
+    }
+  }
+
+  private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        numInvocations += 1;
+        return null;
+      }
+    }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+  }
+
+  private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
+      int numTimes, String[][] buffer) {
+    numInvocations = 0;
+    prepareInvocationCounts(listener);
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    for (String[] bufferElement: buffer) {
+      runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
+    }
+    future = service.scheduleWithFixedDelay(runner, 0,
+        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+    long startTimeMs = System.currentTimeMillis();
+    while(numInvocations != numTimes) {
+      if (System.currentTimeMillis() - startTimeMs > 2000) {
+        fail("Buffered events were not sent for 2 seconds");
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
new file mode 100644
index 0000000..f7404e3
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.display.*;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+
+  private InterpreterGroup intpGroup;
+  private HashMap<String, String> env;
+  private RemoteInterpreter intp;
+  private InterpreterContext context;
+  private RemoteAngularObjectRegistry localRegistry;
+
+  private AtomicInteger onAdd;
+  private AtomicInteger onUpdate;
+  private AtomicInteger onRemove;
+
+  @Before
+  public void setUp() throws Exception {
+    onAdd = new AtomicInteger(0);
+    onUpdate = new AtomicInteger(0);
+    onRemove = new AtomicInteger(0);
+
+    intpGroup = new InterpreterGroup("intpId");
+    localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup);
+    intpGroup.setAngularObjectRegistry(localRegistry);
+    env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+    Properties p = new Properties();
+
+    intp = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterAngular.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false
+    );
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intp);
+    intp.setInterpreterGroup(intpGroup);
+
+    context = new InterpreterContext(
+        "note",
+        "id",
+        null,
+        "title",
+        "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LocalResourcePool("pool1"),
+        new LinkedList<InterpreterContextRunner>(), null);
+
+    intp.open();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    intp.close();
+    intpGroup.close();
+  }
+
+  @Test
+  public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
+    InterpreterResult ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    String[] result = ret.message().get(0).getData().split(" ");
+    assertEquals("0", result[0]); // size of registry
+    assertEquals("0", result[1]); // num watcher called
+
+    // create object
+    ret = intp.interpret("add n1 v1", context);
+    Thread.sleep(500);
+    result = ret.message().get(0).getData().split(" ");
+    assertEquals("1", result[0]); // size of registry
+    assertEquals("0", result[1]); // num watcher called
+    assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+    // update object
+    ret = intp.interpret("update n1 v11", context);
+    result = ret.message().get(0).getData().split(" ");
+    Thread.sleep(500);
+    assertEquals("1", result[0]); // size of registry
+    assertEquals("1", result[1]); // num watcher called
+    assertEquals("v11", localRegistry.get("n1", "note", null).get());
+
+    // remove object
+    ret = intp.interpret("remove n1", context);
+    result = ret.message().get(0).getData().split(" ");
+    Thread.sleep(500);
+    assertEquals("0", result[0]); // size of registry
+    assertEquals("1", result[1]); // num watcher called
+    assertEquals(null, localRegistry.get("n1", "note", null));
+  }
+
+  @Test
+  public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
+    // test if angularobject removal from server side propagate to interpreter process's registry.
+    // will happen when notebook is removed.
+
+    InterpreterResult ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    String[] result = ret.message().get(0).getData().split(" ");
+    assertEquals("0", result[0]); // size of registry
+    
+    // create object
+    ret = intp.interpret("add n1 v1", context);
+    Thread.sleep(500);
+    result = ret.message().get(0).getData().split(" ");
+    assertEquals("1", result[0]); // size of registry
+    assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+    // remove object in local registry.
+    localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
+    ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    result = ret.message().get(0).getData().split(" ");
+    assertEquals("0", result[0]); // size of registry
+  }
+
+  @Test
+  public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
+    // test if angularobject add from server side propagate to interpreter process's registry.
+    // will happen when zeppelin server loads notebook and restore the object into registry
+
+    InterpreterResult ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    String[] result = ret.message().get(0).getData().split(" ");
+    assertEquals("0", result[0]); // size of registry
+    
+    // create object
+    localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
+    
+    // get from remote registry 
+    ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    result = ret.message().get(0).getData().split(" ");
+    assertEquals("1", result[0]); // size of registry
+  }
+
+  @Override
+  public void onAdd(String interpreterGroupId, AngularObject object) {
+    onAdd.incrementAndGet();
+  }
+
+  @Override
+  public void onUpdate(String interpreterGroupId, AngularObject object) {
+    onUpdate.incrementAndGet();
+  }
+
+  @Override
+  public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
+    onRemove.incrementAndGet();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
new file mode 100644
index 0000000..49aa7aa
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.junit.Test;
+
+import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteInterpreterEventPollerTest {
+
+	@Test
+	public void shouldClearUnreadEventsOnShutdown() throws Exception {
+		RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
+		RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
+
+		eventPoller.setInterpreterProcess(interpreterProc);
+		eventPoller.shutdown();
+		eventPoller.start();
+		eventPoller.join();
+
+		assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
+	}
+
+	private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
+		RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
+		RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
+		RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
+		RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
+
+		when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
+		when(intProc.getClient()).thenReturn(client);
+
+		return intProc;
+	}
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
new file mode 100644
index 0000000..3f865cb
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test for remote interpreter output stream
+ */
+public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+  private InterpreterGroup intpGroup;
+  private HashMap<String, String> env;
+
+  @Before
+  public void setUp() throws Exception {
+    intpGroup = new InterpreterGroup();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    intpGroup.close();
+  }
+
+  private RemoteInterpreter createMockInterpreter() {
+    RemoteInterpreter intp = new RemoteInterpreter(
+        new Properties(),
+        "note",
+        MockInterpreterOutputStream.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        this,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.get("note").add(intp);
+    intp.setInterpreterGroup(intpGroup);
+    return intp;
+  }
+
+  private InterpreterContext createInterpreterContext() {
+    return new InterpreterContext(
+        "noteId",
+        "id",
+        null,
+        "title",
+        "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        null,
+        new LinkedList<InterpreterContextRunner>(), null);
+  }
+
+  @Test
+  public void testInterpreterResultOnly() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("staticresult", ret.message().get(0).getData());
+
+    ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("staticresult2", ret.message().get(0).getData());
+
+    ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals("staticresult3", ret.message().get(0).getData());
+  }
+
+  @Test
+  public void testInterpreterOutputStreamOnly() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("streamresult", ret.message().get(0).getData());
+
+    ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals("streamresult2", ret.message().get(0).getData());
+  }
+
+  @Test
+  public void testInterpreterResultOutputStreamMixed() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("stream", ret.message().get(0).getData());
+    assertEquals("static", ret.message().get(1).getData());
+  }
+
+  @Test
+  public void testOutputType() {
+    RemoteInterpreter intp = createMockInterpreter();
+
+    InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+    assertEquals("hello", ret.message().get(0).getData());
+
+    ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+    assertEquals("hello", ret.message().get(0).getData());
+
+    ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+    assertEquals("hello", ret.message().get(0).getData());
+    assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType());
+    assertEquals("world", ret.message().get(1).getData());
+  }
+
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
+
+  }
+
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
+
+  }
+
+  @Override
+  public void onOutputClear(String noteId, String paragraphId) {
+
+  }
+
+  @Override
+  public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
+
+  }
+
+  @Override
+  public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
+    if (callback != null) {
+      callback.onFinished(new LinkedList<>());
+    }
+  }
+
+  @Override
+  public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
+
+  }
+
+  @Override
+  public void onParaInfosReceived(String noteId, String paragraphId,
+      String interpreterSettingId, Map<String, String> metaInfos) {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
new file mode 100644
index 0000000..b85d7ef
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.interpreter.Constants;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.junit.Test;
+
+public class RemoteInterpreterProcessTest {
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+  private static final int DUMMY_PORT=3678;
+
+  @Test
+  public void testStartStop() {
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+        INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+        10 * 1000, null, null,"fakeName");
+    assertFalse(rip.isRunning());
+    assertEquals(0, rip.referenceCount());
+    assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+    assertEquals(2, rip.reference(intpGroup, "anonymous", false));
+    assertEquals(true, rip.isRunning());
+    assertEquals(1, rip.dereference());
+    assertEquals(true, rip.isRunning());
+    assertEquals(0, rip.dereference());
+    assertEquals(false, rip.isRunning());
+  }
+
+  @Test
+  public void testClientFactory() throws Exception {
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+        INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+        mock(RemoteInterpreterEventPoller.class), 10 * 1000, "fakeName");
+    rip.reference(intpGroup, "anonymous", false);
+    assertEquals(0, rip.getNumActiveClient());
+    assertEquals(0, rip.getNumIdleClient());
+
+    Client client = rip.getClient();
+    assertEquals(1, rip.getNumActiveClient());
+    assertEquals(0, rip.getNumIdleClient());
+
+    rip.releaseClient(client);
+    assertEquals(0, rip.getNumActiveClient());
+    assertEquals(1, rip.getNumIdleClient());
+
+    rip.dereference();
+  }
+
+  @Test
+  public void testStartStopRemoteInterpreter() throws TException, InterruptedException {
+    RemoteInterpreterServer server = new RemoteInterpreterServer(3678);
+    server.start();
+    boolean running = false;
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < 10 * 1000) {
+      if (server.isRunning()) {
+        running = true;
+        break;
+      } else {
+        Thread.sleep(200);
+      }
+    }
+    Properties properties = new Properties();
+    properties.setProperty(Constants.ZEPPELIN_INTERPRETER_PORT, "3678");
+    properties.setProperty(Constants.ZEPPELIN_INTERPRETER_HOST, "localhost");
+    InterpreterGroup intpGroup = mock(InterpreterGroup.class);
+    when(intpGroup.getProperty()).thenReturn(properties);
+    when(intpGroup.containsKey(Constants.EXISTING_PROCESS)).thenReturn(true);
+
+    RemoteInterpreterProcess rip = new RemoteInterpreterManagedProcess(
+        INTERPRETER_SCRIPT,
+        "nonexists",
+        "fakeRepo",
+        new HashMap<String, String>(),
+        mock(RemoteInterpreterEventPoller.class)
+        , 10 * 1000,
+        "fakeName");
+    assertFalse(rip.isRunning());
+    assertEquals(0, rip.referenceCount());
+    assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+    assertEquals(true, rip.isRunning());
+  }
+
+
+  @Test
+  public void testPropagateError() throws TException, InterruptedException {
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+        "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
+        10 * 1000, null, null, "fakeName");
+    assertFalse(rip.isRunning());
+    assertEquals(0, rip.referenceCount());
+    try {
+      assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+    } catch (InterpreterException e) {
+      e.getMessage().contains("hello_world");
+    }
+    assertEquals(0, rip.referenceCount());
+  }
+}


[4/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine

Posted by jo...@apache.org.
[MINOR] Move remoteinterpreter into zengine

### What is this PR for?
RemoteInterpreter is only used in the server side then zeppelin-interpreter doesn't have to include this class. Moving this class helps to reduce interpreter binary size and change RemoteInterpreter without adding more dependencies if we want

### What type of PR is it?
[Refactoring]

### Todos
* [x] - Move RemoteInterpreter and related files out of zeppelin-interpreter module

### What is the Jira issue?
N/A

### How should this be tested?
N/A

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jongyoul Lee <jo...@gmail.com>

Closes #2320 from jongyoul/minor/move-remoteinterpreter-into-zengine and squashes the following commits:

80979913c [Jongyoul Lee] Removed author tag
e1425dfa8 [Jongyoul Lee] Adopted DummyInterpreter
99c093229 [Jongyoul Lee] Made DummyInterpreter
5ac8dfbbd [Jongyoul Lee] Moved RemoteInterpreterServer to zeppelin-interpreter
0a881c1b3 [Jongyoul Lee] Removed unused package imported Removed unnecessary classes imported
b7e0b9436 [Jongyoul Lee] moved some files related remote interpreter and fix some minor things
7e8721592 [Jongyoul Lee] move some files of remote packages from zeppelin-interpreter to zeppelin-zengine


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d9c4a5f0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d9c4a5f0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d9c4a5f0

Branch: refs/heads/master
Commit: d9c4a5f0b6c3355753b50f466199cbe551cbd89a
Parents: 8e96d8b
Author: Jongyoul Lee <jo...@gmail.com>
Authored: Sat May 6 02:28:31 2017 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Sat May 6 22:46:36 2017 +0900

----------------------------------------------------------------------
 .../remote/RemoteAngularObjectRegistry.java     | 133 ---
 .../interpreter/remote/RemoteInterpreter.java   | 585 ------------
 .../remote/RemoteInterpreterManagedProcess.java | 247 -----
 .../remote/RemoteInterpreterRunningProcess.java |  67 --
 .../remote/RemoteInterpreterServer.java         |   2 +-
 .../remote/RemoteInterpreterUtils.java          |   9 +-
 .../zeppelin/resource/ResourcePoolUtils.java    |   1 -
 .../zeppelin/scheduler/RemoteScheduler.java     |   1 -
 .../zeppelin/interpreter/DummyInterpreter.java  |  43 +
 .../zeppelin/interpreter/InterpreterTest.java   |   7 +-
 .../remote/AppendOutputRunnerTest.java          | 236 -----
 .../remote/RemoteAngularObjectTest.java         | 201 ----
 .../RemoteInterpreterEventPollerTest.java       |  55 --
 .../RemoteInterpreterOutputTestStream.java      | 191 ----
 .../remote/RemoteInterpreterProcessTest.java    | 131 ---
 .../remote/RemoteInterpreterTest.java           | 914 -------------------
 .../remote/RemoteInterpreterUtilsTest.java      |  34 -
 .../remote/mock/MockInterpreterA.java           |  94 --
 .../remote/mock/MockInterpreterAngular.java     | 113 ---
 .../remote/mock/MockInterpreterB.java           | 126 ---
 .../remote/mock/MockInterpreterEnv.java         |  80 --
 .../mock/MockInterpreterOutputStream.java       |  90 --
 .../mock/MockInterpreterResourcePool.java       | 128 ---
 .../resource/DistributedResourcePoolTest.java   | 303 ------
 .../zeppelin/scheduler/RemoteSchedulerTest.java | 364 --------
 .../remote/RemoteAngularObjectRegistry.java     | 133 +++
 .../interpreter/remote/RemoteInterpreter.java   | 577 ++++++++++++
 .../remote/RemoteInterpreterManagedProcess.java | 247 +++++
 .../remote/RemoteInterpreterRunningProcess.java |  67 ++
 .../remote/AppendOutputRunnerTest.java          | 236 +++++
 .../remote/RemoteAngularObjectTest.java         | 201 ++++
 .../RemoteInterpreterEventPollerTest.java       |  55 ++
 .../RemoteInterpreterOutputTestStream.java      | 191 ++++
 .../remote/RemoteInterpreterProcessTest.java    | 131 +++
 .../remote/RemoteInterpreterTest.java           | 914 +++++++++++++++++++
 .../remote/RemoteInterpreterUtilsTest.java      |  34 +
 .../remote/mock/MockInterpreterA.java           |  94 ++
 .../remote/mock/MockInterpreterAngular.java     | 113 +++
 .../remote/mock/MockInterpreterB.java           | 126 +++
 .../remote/mock/MockInterpreterEnv.java         |  80 ++
 .../mock/MockInterpreterOutputStream.java       |  90 ++
 .../mock/MockInterpreterResourcePool.java       | 128 +++
 .../resource/DistributedResourcePoolTest.java   | 303 ++++++
 .../zeppelin/scheduler/RemoteSchedulerTest.java | 364 ++++++++
 44 files changed, 4139 insertions(+), 4100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
deleted file mode 100644
index 0ac7116..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.List;
-
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-
-/**
- * Proxy for AngularObjectRegistry that exists in remote interpreter process
- */
-public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
-  Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
-  private InterpreterGroup interpreterGroup;
-
-  public RemoteAngularObjectRegistry(String interpreterId,
-      AngularObjectRegistryListener listener,
-      InterpreterGroup interpreterGroup) {
-    super(interpreterId, listener);
-    this.interpreterGroup = interpreterGroup;
-  }
-
-  private RemoteInterpreterProcess getRemoteInterpreterProcess() {
-    return interpreterGroup.getRemoteInterpreterProcess();
-  }
-
-  /**
-   * When ZeppelinServer side code want to add angularObject to the registry,
-   * this method should be used instead of add()
-   * @param name
-   * @param o
-   * @param noteId
-   * @return
-   */
-  public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String
-          paragraphId) {
-    Gson gson = new Gson();
-    RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
-    if (!remoteInterpreterProcess.isRunning()) {
-      return super.add(name, o, noteId, paragraphId, true);
-    }
-
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = remoteInterpreterProcess.getClient();
-      client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
-      return super.add(name, o, noteId, paragraphId, true);
-    } catch (TException e) {
-      broken = true;
-      logger.error("Error", e);
-    } catch (Exception e) {
-      logger.error("Error", e);
-    } finally {
-      if (client != null) {
-        remoteInterpreterProcess.releaseClient(client, broken);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * When ZeppelinServer side code want to remove angularObject from the registry,
-   * this method should be used instead of remove()
-   * @param name
-   * @param noteId
-   * @param paragraphId
-   * @return
-   */
-  public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String
-          paragraphId) {
-    RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
-    if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
-      return super.remove(name, noteId, paragraphId);
-    }
-
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = remoteInterpreterProcess.getClient();
-      client.angularObjectRemove(name, noteId, paragraphId);
-      return super.remove(name, noteId, paragraphId);
-    } catch (TException e) {
-      broken = true;
-      logger.error("Error", e);
-    } catch (Exception e) {
-      logger.error("Error", e);
-    } finally {
-      if (client != null) {
-        remoteInterpreterProcess.releaseClient(client, broken);
-      }
-    }
-    return null;
-  }
-  
-  public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
-    List<AngularObject> all = getAll(noteId, paragraphId);
-    for (AngularObject ao : all) {
-      removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId);
-    }
-  }
-
-  @Override
-  protected AngularObject createNewAngularObject(String name, Object o, String noteId, String
-          paragraphId) {
-    return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup,
-        getAngularObjectListener());
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
deleted file mode 100644
index 123ad75..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ /dev/null
@@ -1,585 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.*;
-
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-/**
- * Proxy for Interpreter instance that runs on separate process
- */
-public class RemoteInterpreter extends Interpreter {
-  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
-
-  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
-  private final ApplicationEventListener applicationEventListener;
-  private Gson gson = new Gson();
-  private String interpreterRunner;
-  private String interpreterPath;
-  private String localRepoPath;
-  private String className;
-  private String sessionKey;
-  private FormType formType;
-  private boolean initialized;
-  private Map<String, String> env;
-  private int connectTimeout;
-  private int maxPoolSize;
-  private String host;
-  private int port;
-  private String userName;
-  private Boolean isUserImpersonate;
-  private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
-  private String interpreterGroupName;
-
-  /**
-   * Remote interpreter and manage interpreter process
-   */
-  public RemoteInterpreter(Properties property, String sessionKey, String className,
-      String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
-      int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
-      int outputLimit, String interpreterGroupName) {
-    super(property);
-    this.sessionKey = sessionKey;
-    this.className = className;
-    initialized = false;
-    this.interpreterRunner = interpreterRunner;
-    this.interpreterPath = interpreterPath;
-    this.localRepoPath = localRepoPath;
-    env = getEnvFromInterpreterProperty(property);
-    this.connectTimeout = connectTimeout;
-    this.maxPoolSize = maxPoolSize;
-    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
-    this.applicationEventListener = appListener;
-    this.userName = userName;
-    this.isUserImpersonate = isUserImpersonate;
-    this.outputLimit = outputLimit;
-    this.interpreterGroupName = interpreterGroupName;
-  }
-
-
-  /**
-   * Connect to existing process
-   */
-  public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
-      int port, String localRepoPath, int connectTimeout, int maxPoolSize,
-      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
-      int outputLimit) {
-    super(property);
-    this.sessionKey = sessionKey;
-    this.className = className;
-    initialized = false;
-    this.host = host;
-    this.port = port;
-    this.localRepoPath = localRepoPath;
-    this.connectTimeout = connectTimeout;
-    this.maxPoolSize = maxPoolSize;
-    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
-    this.applicationEventListener = appListener;
-    this.userName = userName;
-    this.isUserImpersonate = isUserImpersonate;
-    this.outputLimit = outputLimit;
-  }
-
-
-  // VisibleForTesting
-  public RemoteInterpreter(Properties property, String sessionKey, String className,
-      String interpreterRunner, String interpreterPath, String localRepoPath,
-      Map<String, String> env, int connectTimeout,
-      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
-    super(property);
-    this.className = className;
-    this.sessionKey = sessionKey;
-    this.interpreterRunner = interpreterRunner;
-    this.interpreterPath = interpreterPath;
-    this.localRepoPath = localRepoPath;
-    env.putAll(getEnvFromInterpreterProperty(property));
-    this.env = env;
-    this.connectTimeout = connectTimeout;
-    this.maxPoolSize = 10;
-    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
-    this.applicationEventListener = appListener;
-    this.userName = userName;
-    this.isUserImpersonate = isUserImpersonate;
-  }
-
-  private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
-    Map<String, String> env = new HashMap<>();
-    for (Object key : property.keySet()) {
-      if (isEnvString((String) key)) {
-        env.put((String) key, property.getProperty((String) key));
-      }
-    }
-    return env;
-  }
-
-  static boolean isEnvString(String key) {
-    if (key == null || key.length() == 0) {
-      return false;
-    }
-
-    return key.matches("^[A-Z_0-9]*");
-  }
-
-  @Override
-  public String getClassName() {
-    return className;
-  }
-
-  private boolean connectToExistingProcess() {
-    return host != null && port > 0;
-  }
-
-  public RemoteInterpreterProcess getInterpreterProcess() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup == null) {
-      return null;
-    }
-
-    synchronized (intpGroup) {
-      if (intpGroup.getRemoteInterpreterProcess() == null) {
-        RemoteInterpreterProcess remoteProcess;
-        if (connectToExistingProcess()) {
-          remoteProcess = new RemoteInterpreterRunningProcess(
-              connectTimeout,
-              remoteInterpreterProcessListener,
-              applicationEventListener,
-              host,
-              port);
-        } else {
-          // create new remote process
-          remoteProcess = new RemoteInterpreterManagedProcess(
-              interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
-              remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
-        }
-
-        intpGroup.setRemoteInterpreterProcess(remoteProcess);
-      }
-
-      return intpGroup.getRemoteInterpreterProcess();
-    }
-  }
-
-  public synchronized void init() {
-    if (initialized == true) {
-      return;
-    }
-
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-
-    final InterpreterGroup interpreterGroup = getInterpreterGroup();
-
-    interpreterProcess.setMaxPoolSize(
-        Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
-    String groupId = interpreterGroup.getId();
-
-    synchronized (interpreterProcess) {
-      Client client = null;
-      try {
-        client = interpreterProcess.getClient();
-      } catch (Exception e1) {
-        throw new InterpreterException(e1);
-      }
-
-      boolean broken = false;
-      try {
-        logger.info("Create remote interpreter {}", getClassName());
-        if (localRepoPath != null) {
-          property.put("zeppelin.interpreter.localRepo", localRepoPath);
-        }
-
-        property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
-        client.createInterpreter(groupId, sessionKey,
-            getClassName(), (Map) property, userName);
-        // Push angular object loaded from JSON file to remote interpreter
-        if (!interpreterGroup.isAngularRegistryPushed()) {
-          pushAngularObjectRegistryToRemote(client);
-          interpreterGroup.setAngularRegistryPushed(true);
-        }
-
-      } catch (TException e) {
-        logger.error("Failed to create interpreter: {}", getClassName());
-        throw new InterpreterException(e);
-      } finally {
-        // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
-        interpreterProcess.releaseClient(client, broken);
-      }
-    }
-    initialized = true;
-  }
-
-
-  @Override
-  public void open() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-
-    synchronized (interpreterGroup) {
-      // initialize all interpreters in this interpreter group
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
-      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
-      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
-      // doesn't call open method, it's not open. It causes problem while running intp.close()
-      // In case of Spark, this method initializes all of interpreters and init() method increases
-      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
-      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
-      // But for now, we have to initialise all of interpreters for some reasons.
-      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
-      RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-      if (!initialized) {
-        // reference per session
-        interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
-      }
-      for (Interpreter intp : new ArrayList<>(interpreters)) {
-        Interpreter p = intp;
-        while (p instanceof WrappedInterpreter) {
-          p = ((WrappedInterpreter) p).getInnerInterpreter();
-        }
-        try {
-          ((RemoteInterpreter) p).init();
-        } catch (InterpreterException e) {
-          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
-              p.getClassName());
-          interpreters.remove(p);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void close() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-    synchronized (interpreterGroup) {
-      // close all interpreters in this session
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
-      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
-      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
-      // doesn't call open method, it's not open. It causes problem while running intp.close()
-      // In case of Spark, this method initializes all of interpreters and init() method increases
-      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
-      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
-      // But for now, we have to initialise all of interpreters for some reasons.
-      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
-      if (initialized) {
-        // dereference per session
-        getInterpreterProcess().dereference();
-      }
-      for (Interpreter intp : new ArrayList<>(interpreters)) {
-        Interpreter p = intp;
-        while (p instanceof WrappedInterpreter) {
-          p = ((WrappedInterpreter) p).getInnerInterpreter();
-        }
-        try {
-          ((RemoteInterpreter) p).closeInterpreter();
-        } catch (InterpreterException e) {
-          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
-              p.getClassName());
-          interpreters.remove(p);
-        }
-      }
-    }
-  }
-
-  public void closeInterpreter() {
-    if (this.initialized == false) {
-      return;
-    }
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = interpreterProcess.getClient();
-      if (client != null) {
-        client.close(sessionKey, className);
-      }
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    } finally {
-      if (client != null) {
-        interpreterProcess.releaseClient(client, broken);
-      }
-      this.initialized = false;
-    }
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    if (logger.isDebugEnabled()) {
-      logger.debug("st:\n{}", st);
-    }
-
-    FormType form = getFormType();
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
-        .getInterpreterContextRunnerPool();
-
-    List<InterpreterContextRunner> runners = context.getRunners();
-    if (runners != null && runners.size() != 0) {
-      // assume all runners in this InterpreterContext have the same note id
-      String noteId = runners.get(0).getNoteId();
-
-      interpreterContextRunnerPool.clear(noteId);
-      interpreterContextRunnerPool.addAll(noteId, runners);
-    }
-
-    boolean broken = false;
-    try {
-
-      final GUI currentGUI = context.getGui();
-      RemoteInterpreterResult remoteResult = client.interpret(
-          sessionKey, className, st, convert(context));
-
-      Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
-          remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
-          }.getType());
-      context.getConfig().clear();
-      context.getConfig().putAll(remoteConfig);
-
-      if (form == FormType.NATIVE) {
-        GUI remoteGui = GUI.fromJson(remoteResult.getGui());
-        currentGUI.clear();
-        currentGUI.setParams(remoteGui.getParams());
-        currentGUI.setForms(remoteGui.getForms());
-      } else if (form == FormType.SIMPLE) {
-        final Map<String, Input> currentForms = currentGUI.getForms();
-        final Map<String, Object> currentParams = currentGUI.getParams();
-        final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
-        final Map<String, Input> remoteForms = remoteGUI.getForms();
-        final Map<String, Object> remoteParams = remoteGUI.getParams();
-        currentForms.putAll(remoteForms);
-        currentParams.putAll(remoteParams);
-      }
-
-      InterpreterResult result = convert(remoteResult);
-      return result;
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      client.cancel(sessionKey, className, convert(context));
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-  @Override
-  public FormType getFormType() {
-    open();
-
-    if (formType != null) {
-      return formType;
-    }
-
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      formType = FormType.valueOf(client.getFormType(sessionKey, className));
-      return formType;
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    if (interpreterProcess == null || !interpreterProcess.isRunning()) {
-      return 0;
-    }
-
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      return client.getProgress(sessionKey, className, convert(context));
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      List completion = client.completion(sessionKey, className, buf, cursor,
-          convert(interpreterContext));
-      return completion;
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    int maxConcurrency = maxPoolSize;
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    if (interpreterProcess == null) {
-      return null;
-    } else {
-      return SchedulerFactory.singleton().createOrGetRemoteScheduler(
-          RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
-          sessionKey, interpreterProcess, maxConcurrency);
-    }
-  }
-
-  private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
-    return interpreterGroup.getId();
-  }
-
-  private RemoteInterpreterContext convert(InterpreterContext ic) {
-    return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
-        ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
-        gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
-  }
-
-  private InterpreterResult convert(RemoteInterpreterResult result) {
-    InterpreterResult r = new InterpreterResult(
-        InterpreterResult.Code.valueOf(result.getCode()));
-
-    for (RemoteInterpreterResultMessage m : result.getMsg()) {
-      r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
-    }
-
-    return r;
-  }
-
-  /**
-   * Push local angular object registry to
-   * remote interpreter. This method should be
-   * call ONLY inside the init() method
-   */
-  void pushAngularObjectRegistryToRemote(Client client) throws TException {
-    final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
-        .getAngularObjectRegistry();
-
-    if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
-      final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
-          .getRegistry();
-
-      logger.info("Push local angular object registry from ZeppelinServer to" +
-          " remote interpreter group {}", this.getInterpreterGroup().getId());
-
-      final java.lang.reflect.Type registryType = new TypeToken<Map<String,
-          Map<String, AngularObject>>>() {
-      }.getType();
-
-      Gson gson = new Gson();
-      client.angularRegistryPush(gson.toJson(registry, registryType));
-    }
-  }
-
-  public Map<String, String> getEnv() {
-    return env;
-  }
-
-  public void setEnv(Map<String, String> env) {
-    this.env = env;
-  }
-
-  public void addEnv(Map<String, String> env) {
-    if (this.env == null) {
-      this.env = new HashMap<>();
-    }
-    this.env.putAll(env);
-  }
-
-  //Only for test
-  public String getInterpreterRunner() {
-    return interpreterRunner;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
deleted file mode 100644
index 1fb9b90..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.commons.exec.*;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * This class manages start / stop of remote interpreter process
- */
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
-    implements ExecuteResultHandler {
-  private static final Logger logger = LoggerFactory.getLogger(
-      RemoteInterpreterManagedProcess.class);
-  private final String interpreterRunner;
-
-  private DefaultExecutor executor;
-  private ExecuteWatchdog watchdog;
-  boolean running = false;
-  private int port = -1;
-  private final String interpreterDir;
-  private final String localRepoDir;
-  private final String interpreterGroupName;
-
-  private Map<String, String> env;
-
-  public RemoteInterpreterManagedProcess(
-      String intpRunner,
-      String intpDir,
-      String localRepoDir,
-      Map<String, String> env,
-      int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener,
-      String interpreterGroupName) {
-    super(new RemoteInterpreterEventPoller(listener, appListener),
-        connectTimeout);
-    this.interpreterRunner = intpRunner;
-    this.env = env;
-    this.interpreterDir = intpDir;
-    this.localRepoDir = localRepoDir;
-    this.interpreterGroupName = interpreterGroupName;
-  }
-
-  RemoteInterpreterManagedProcess(String intpRunner,
-                                  String intpDir,
-                                  String localRepoDir,
-                                  Map<String, String> env,
-                                  RemoteInterpreterEventPoller remoteInterpreterEventPoller,
-                                  int connectTimeout,
-                                  String interpreterGroupName) {
-    super(remoteInterpreterEventPoller,
-        connectTimeout);
-    this.interpreterRunner = intpRunner;
-    this.env = env;
-    this.interpreterDir = intpDir;
-    this.localRepoDir = localRepoDir;
-    this.interpreterGroupName = interpreterGroupName;
-  }
-
-  @Override
-  public String getHost() {
-    return "localhost";
-  }
-
-  @Override
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public void start(String userName, Boolean isUserImpersonate) {
-    // start server process
-    try {
-      port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
-    } catch (IOException e1) {
-      throw new InterpreterException(e1);
-    }
-
-    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
-    cmdLine.addArgument("-d", false);
-    cmdLine.addArgument(interpreterDir, false);
-    cmdLine.addArgument("-p", false);
-    cmdLine.addArgument(Integer.toString(port), false);
-    if (isUserImpersonate && !userName.equals("anonymous")) {
-      cmdLine.addArgument("-u", false);
-      cmdLine.addArgument(userName, false);
-    }
-    cmdLine.addArgument("-l", false);
-    cmdLine.addArgument(localRepoDir, false);
-    cmdLine.addArgument("-g", false);
-    cmdLine.addArgument(interpreterGroupName, false);
-
-    executor = new DefaultExecutor();
-
-    ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
-    ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
-    processOutput.setOutputStream(cmdOut);
-
-    executor.setStreamHandler(new PumpStreamHandler(processOutput));
-    watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
-    executor.setWatchdog(watchdog);
-
-    try {
-      Map procEnv = EnvironmentUtils.getProcEnvironment();
-      procEnv.putAll(env);
-
-      logger.info("Run interpreter process {}", cmdLine);
-      executor.execute(cmdLine, procEnv, this);
-      running = true;
-    } catch (IOException e) {
-      running = false;
-      throw new InterpreterException(e);
-    }
-
-
-    long startTime = System.currentTimeMillis();
-    while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
-      if (!running) {
-        try {
-          cmdOut.flush();
-        } catch (IOException e) {
-          // nothing to do
-        }
-        throw new InterpreterException(new String(cmdOut.toByteArray()));
-      }
-
-      try {
-        if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
-          break;
-        } else {
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
-                    "Thread.sleep", e);
-          }
-        }
-      } catch (Exception e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Remote interpreter not yet accessible at localhost:" + port);
-        }
-      }
-    }
-    processOutput.setOutputStream(null);
-  }
-
-  public void stop() {
-    if (isRunning()) {
-      logger.info("kill interpreter process");
-      watchdog.destroyProcess();
-    }
-
-    executor = null;
-    watchdog = null;
-    running = false;
-    logger.info("Remote process terminated");
-  }
-
-  @Override
-  public void onProcessComplete(int exitValue) {
-    logger.info("Interpreter process exited {}", exitValue);
-    running = false;
-
-  }
-
-  @Override
-  public void onProcessFailed(ExecuteException e) {
-    logger.info("Interpreter process failed {}", e);
-    running = false;
-  }
-
-  public boolean isRunning() {
-    return running;
-  }
-
-  private static class ProcessLogOutputStream extends LogOutputStream {
-
-    private Logger logger;
-    OutputStream out;
-
-    public ProcessLogOutputStream(Logger logger) {
-      this.logger = logger;
-    }
-
-    @Override
-    protected void processLine(String s, int i) {
-      this.logger.debug(s);
-    }
-
-    @Override
-    public void write(byte [] b) throws IOException {
-      super.write(b);
-
-      if (out != null) {
-        synchronized (this) {
-          if (out != null) {
-            out.write(b);
-          }
-        }
-      }
-    }
-
-    @Override
-    public void write(byte [] b, int offset, int len) throws IOException {
-      super.write(b, offset, len);
-
-      if (out != null) {
-        synchronized (this) {
-          if (out != null) {
-            out.write(b, offset, len);
-          }
-        }
-      }
-    }
-
-    public void setOutputStream(OutputStream out) {
-      synchronized (this) {
-        this.out = out;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
deleted file mode 100644
index bb176be..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class connects to existing process
- */
-public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
-  private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
-  private final String host;
-  private final int port;
-
-  public RemoteInterpreterRunningProcess(
-      int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener,
-      String host,
-      int port
-  ) {
-    super(connectTimeout, listener, appListener);
-    this.host = host;
-    this.port = port;
-  }
-
-  @Override
-  public String getHost() {
-    return host;
-  }
-
-  @Override
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public void start(String userName, Boolean isUserImpersonate) {
-    // assume process is externally managed. nothing to do
-  }
-
-  @Override
-  public void stop() {
-    // assume process is externally managed. nothing to do
-  }
-
-  @Override
-  public boolean isRunning() {
-    return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 8f40ec4..50881ca 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -218,7 +218,7 @@ public class RemoteInterpreterServer
 
   private void setSystemProperty(Properties properties) {
     for (Object key : properties.keySet()) {
-      if (!RemoteInterpreter.isEnvString((String) key)) {
+      if (!RemoteInterpreterUtils.isEnvString((String) key)) {
         String value = properties.getProperty((String) key);
         if (value == null || value.isEmpty()) {
           System.clearProperty((String) key);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 8308222..4ee6690 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,4 +72,12 @@ public class RemoteInterpreterUtils {
     }
     return settingId;
   }
+
+  public static boolean isEnvString(String key) {
+    if (key == null || key.length() == 0) {
+      return false;
+    }
+
+    return key.matches("^[A-Z_0-9]*");
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
index 1a7f606..a55cdf9 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.resource;
 
 import com.google.gson.Gson;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index a4ab00e..f9ddc4e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -20,7 +20,6 @@ package org.apache.zeppelin.scheduler;
 import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.apache.zeppelin.scheduler.Job.Status;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
new file mode 100644
index 0000000..a7a6eb9
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
@@ -0,0 +1,43 @@
+package org.apache.zeppelin.interpreter;
+
+import java.util.Properties;
+
+/**
+ *
+ */
+public class DummyInterpreter extends Interpreter {
+
+  public DummyInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    return null;
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return null;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
index a9ac1fc..4141e95 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.interpreter;
 
 import java.util.Properties;
 
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Test;
 
@@ -31,7 +30,7 @@ public class InterpreterTest {
   public void testDefaultProperty() {
     Properties p = new Properties();
     p.put("p1", "v1");
-    MockInterpreterA intp = new MockInterpreterA(p);
+    Interpreter intp = new DummyInterpreter(p);
 
     assertEquals(1, intp.getProperty().size());
     assertEquals("v1", intp.getProperty().get("p1"));
@@ -42,7 +41,7 @@ public class InterpreterTest {
   public void testOverriddenProperty() {
     Properties p = new Properties();
     p.put("p1", "v1");
-    MockInterpreterA intp = new MockInterpreterA(p);
+    Interpreter intp = new DummyInterpreter(p);
     Properties overriddenProperty = new Properties();
     overriddenProperty.put("p1", "v2");
     intp.setProperty(overriddenProperty);
@@ -74,7 +73,7 @@ public class InterpreterTest {
     Properties p = new Properties();
     p.put("p1", "replName #{noteId}, #{paragraphTitle}, #{paragraphId}, #{paragraphText}, #{replName}, #{noteId}, #{user}," +
         " #{authenticationInfo}");
-    MockInterpreterA intp = new MockInterpreterA(p);
+    Interpreter intp = new DummyInterpreter(p);
     intp.setUserName(user);
     String actual = intp.getProperty("p1");
     InterpreterContext.remove();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
deleted file mode 100644
index c8c64ea..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.atMost;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.After;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class AppendOutputRunnerTest {
-
-  private static final int NUM_EVENTS = 10000;
-  private static final int NUM_CLUBBED_EVENTS = 100;
-  private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
-  private static ScheduledFuture<?> future = null;
-  /* It is being accessed by multiple threads.
-   * While loop for 'loopForBufferCompletion' could
-   * run for-ever.
-   */
-  private volatile static int numInvocations = 0;
-
-  @After
-  public void afterEach() {
-    if (future != null) {
-      future.cancel(true);
-    }
-  }
-
-  @Test
-  public void testSingleEvent() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    String[][] buffer = {{"note", "para", "data\n"}};
-
-    loopForCompletingEvents(listener, 1, buffer);
-    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-    verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n");
-  }
-
-  @Test
-  public void testMultipleEventsOfSameParagraph() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    String note1 = "note1";
-    String para1 = "para1";
-    String[][] buffer = {
-        {note1, para1, "data1\n"},
-        {note1, para1, "data2\n"},
-        {note1, para1, "data3\n"}
-    };
-
-    loopForCompletingEvents(listener, 1, buffer);
-    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-    verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
-  }
-
-  @Test
-  public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    String note1 = "note1";
-    String note2 = "note2";
-    String para1 = "para1";
-    String para2 = "para2";
-    String[][] buffer = {
-        {note1, para1, "data1\n"},
-        {note1, para2, "data2\n"},
-        {note2, para1, "data3\n"},
-        {note2, para2, "data4\n"}
-    };
-    loopForCompletingEvents(listener, 4, buffer);
-
-    verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-    verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n");
-    verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n");
-    verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n");
-    verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n");
-  }
-
-  @Test
-  public void testClubbedData() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    AppendOutputRunner runner = new AppendOutputRunner(listener);
-    future = service.scheduleWithFixedDelay(runner, 0,
-        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
-    Thread thread = new Thread(new BombardEvents(runner));
-    thread.start();
-    thread.join();
-    Thread.sleep(1000);
-
-    /* NUM_CLUBBED_EVENTS is a heuristic number.
-     * It has been observed that for 10,000 continuos event
-     * calls, 30-40 Web-socket calls are made. Keeping
-     * the unit-test to a pessimistic 100 web-socket calls.
-     */
-    verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-  }
-
-  @Test
-  public void testWarnLoggerForLargeData() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    AppendOutputRunner runner = new AppendOutputRunner(listener);
-    String data = "data\n";
-    int numEvents = 100000;
-
-    for (int i=0; i<numEvents; i++) {
-      runner.appendBuffer("noteId", "paraId", 0, data);
-    }
-
-    TestAppender appender = new TestAppender();
-    Logger logger = Logger.getRootLogger();
-    logger.addAppender(appender);
-    Logger.getLogger(RemoteInterpreterEventPoller.class);
-
-    runner.run();
-    List<LoggingEvent> log;
-
-    int warnLogCounter;
-    LoggingEvent sizeWarnLogEntry = null;
-    do {
-      warnLogCounter = 0;
-      log = appender.getLog();
-      for (LoggingEvent logEntry: log) {
-        if (Level.WARN.equals(logEntry.getLevel())) {
-          sizeWarnLogEntry = logEntry;
-          warnLogCounter += 1;
-        }
-      }
-    } while(warnLogCounter != 2);
-
-    String loggerString = "Processing size for buffered append-output is high: " +
-        (data.length() * numEvents) + " characters.";
-    assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
-  }
-
-  private class BombardEvents implements Runnable {
-
-    private final AppendOutputRunner runner;
-
-    private BombardEvents(AppendOutputRunner runner) {
-      this.runner = runner;
-    }
-
-    @Override
-    public void run() {
-      String noteId = "noteId";
-      String paraId = "paraId";
-      for (int i=0; i<NUM_EVENTS; i++) {
-        runner.appendBuffer(noteId, paraId, 0, "data\n");
-      }
-    }
-  }
-
-  private class TestAppender extends AppenderSkeleton {
-    private final List<LoggingEvent> log = new ArrayList<>();
-
-    @Override
-    public boolean requiresLayout() {
-        return false;
-    }
-
-    @Override
-    protected void append(final LoggingEvent loggingEvent) {
-        log.add(loggingEvent);
-    }
-
-    @Override
-    public void close() {
-    }
-
-    public List<LoggingEvent> getLog() {
-        return new ArrayList<>(log);
-    }
-  }
-
-  private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        numInvocations += 1;
-        return null;
-      }
-    }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-  }
-
-  private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
-      int numTimes, String[][] buffer) {
-    numInvocations = 0;
-    prepareInvocationCounts(listener);
-    AppendOutputRunner runner = new AppendOutputRunner(listener);
-    for (String[] bufferElement: buffer) {
-      runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
-    }
-    future = service.scheduleWithFixedDelay(runner, 0,
-        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
-    long startTimeMs = System.currentTimeMillis();
-    while(numInvocations != numTimes) {
-      if (System.currentTimeMillis() - startTimeMs > 2000) {
-        fail("Buffered events were not sent for 2 seconds");
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
deleted file mode 100644
index f7404e3..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.zeppelin.display.*;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-
-  private InterpreterGroup intpGroup;
-  private HashMap<String, String> env;
-  private RemoteInterpreter intp;
-  private InterpreterContext context;
-  private RemoteAngularObjectRegistry localRegistry;
-
-  private AtomicInteger onAdd;
-  private AtomicInteger onUpdate;
-  private AtomicInteger onRemove;
-
-  @Before
-  public void setUp() throws Exception {
-    onAdd = new AtomicInteger(0);
-    onUpdate = new AtomicInteger(0);
-    onRemove = new AtomicInteger(0);
-
-    intpGroup = new InterpreterGroup("intpId");
-    localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup);
-    intpGroup.setAngularObjectRegistry(localRegistry);
-    env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
-    Properties p = new Properties();
-
-    intp = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterAngular.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false
-    );
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intp);
-    intp.setInterpreterGroup(intpGroup);
-
-    context = new InterpreterContext(
-        "note",
-        "id",
-        null,
-        "title",
-        "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("pool1"),
-        new LinkedList<InterpreterContextRunner>(), null);
-
-    intp.open();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    intp.close();
-    intpGroup.close();
-  }
-
-  @Test
-  public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
-    InterpreterResult ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    String[] result = ret.message().get(0).getData().split(" ");
-    assertEquals("0", result[0]); // size of registry
-    assertEquals("0", result[1]); // num watcher called
-
-    // create object
-    ret = intp.interpret("add n1 v1", context);
-    Thread.sleep(500);
-    result = ret.message().get(0).getData().split(" ");
-    assertEquals("1", result[0]); // size of registry
-    assertEquals("0", result[1]); // num watcher called
-    assertEquals("v1", localRegistry.get("n1", "note", null).get());
-
-    // update object
-    ret = intp.interpret("update n1 v11", context);
-    result = ret.message().get(0).getData().split(" ");
-    Thread.sleep(500);
-    assertEquals("1", result[0]); // size of registry
-    assertEquals("1", result[1]); // num watcher called
-    assertEquals("v11", localRegistry.get("n1", "note", null).get());
-
-    // remove object
-    ret = intp.interpret("remove n1", context);
-    result = ret.message().get(0).getData().split(" ");
-    Thread.sleep(500);
-    assertEquals("0", result[0]); // size of registry
-    assertEquals("1", result[1]); // num watcher called
-    assertEquals(null, localRegistry.get("n1", "note", null));
-  }
-
-  @Test
-  public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
-    // test if angularobject removal from server side propagate to interpreter process's registry.
-    // will happen when notebook is removed.
-
-    InterpreterResult ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    String[] result = ret.message().get(0).getData().split(" ");
-    assertEquals("0", result[0]); // size of registry
-    
-    // create object
-    ret = intp.interpret("add n1 v1", context);
-    Thread.sleep(500);
-    result = ret.message().get(0).getData().split(" ");
-    assertEquals("1", result[0]); // size of registry
-    assertEquals("v1", localRegistry.get("n1", "note", null).get());
-
-    // remove object in local registry.
-    localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
-    ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    result = ret.message().get(0).getData().split(" ");
-    assertEquals("0", result[0]); // size of registry
-  }
-
-  @Test
-  public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
-    // test if angularobject add from server side propagate to interpreter process's registry.
-    // will happen when zeppelin server loads notebook and restore the object into registry
-
-    InterpreterResult ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    String[] result = ret.message().get(0).getData().split(" ");
-    assertEquals("0", result[0]); // size of registry
-    
-    // create object
-    localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
-    
-    // get from remote registry 
-    ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    result = ret.message().get(0).getData().split(" ");
-    assertEquals("1", result[0]); // size of registry
-  }
-
-  @Override
-  public void onAdd(String interpreterGroupId, AngularObject object) {
-    onAdd.incrementAndGet();
-  }
-
-  @Override
-  public void onUpdate(String interpreterGroupId, AngularObject object) {
-    onUpdate.incrementAndGet();
-  }
-
-  @Override
-  public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
-    onRemove.incrementAndGet();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
deleted file mode 100644
index 49aa7aa..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.junit.Test;
-
-import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class RemoteInterpreterEventPollerTest {
-
-	@Test
-	public void shouldClearUnreadEventsOnShutdown() throws Exception {
-		RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
-		RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
-
-		eventPoller.setInterpreterProcess(interpreterProc);
-		eventPoller.shutdown();
-		eventPoller.start();
-		eventPoller.join();
-
-		assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
-	}
-
-	private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
-		RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
-		RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
-		RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
-		RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
-
-		when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
-		when(intProc.getClient()).thenReturn(client);
-
-		return intProc;
-	}
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
deleted file mode 100644
index 3f865cb..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-
-/**
- * Test for remote interpreter output stream
- */
-public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-  private InterpreterGroup intpGroup;
-  private HashMap<String, String> env;
-
-  @Before
-  public void setUp() throws Exception {
-    intpGroup = new InterpreterGroup();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    intpGroup.close();
-  }
-
-  private RemoteInterpreter createMockInterpreter() {
-    RemoteInterpreter intp = new RemoteInterpreter(
-        new Properties(),
-        "note",
-        MockInterpreterOutputStream.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        this,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.get("note").add(intp);
-    intp.setInterpreterGroup(intpGroup);
-    return intp;
-  }
-
-  private InterpreterContext createInterpreterContext() {
-    return new InterpreterContext(
-        "noteId",
-        "id",
-        null,
-        "title",
-        "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        null,
-        new LinkedList<InterpreterContextRunner>(), null);
-  }
-
-  @Test
-  public void testInterpreterResultOnly() {
-    RemoteInterpreter intp = createMockInterpreter();
-    InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals("staticresult", ret.message().get(0).getData());
-
-    ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals("staticresult2", ret.message().get(0).getData());
-
-    ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.ERROR, ret.code());
-    assertEquals("staticresult3", ret.message().get(0).getData());
-  }
-
-  @Test
-  public void testInterpreterOutputStreamOnly() {
-    RemoteInterpreter intp = createMockInterpreter();
-    InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals("streamresult", ret.message().get(0).getData());
-
-    ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.ERROR, ret.code());
-    assertEquals("streamresult2", ret.message().get(0).getData());
-  }
-
-  @Test
-  public void testInterpreterResultOutputStreamMixed() {
-    RemoteInterpreter intp = createMockInterpreter();
-    InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals("stream", ret.message().get(0).getData());
-    assertEquals("static", ret.message().get(1).getData());
-  }
-
-  @Test
-  public void testOutputType() {
-    RemoteInterpreter intp = createMockInterpreter();
-
-    InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
-    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
-    assertEquals("hello", ret.message().get(0).getData());
-
-    ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
-    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
-    assertEquals("hello", ret.message().get(0).getData());
-
-    ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
-    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
-    assertEquals("hello", ret.message().get(0).getData());
-    assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType());
-    assertEquals("world", ret.message().get(1).getData());
-  }
-
-  @Override
-  public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
-
-  }
-
-  @Override
-  public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
-
-  }
-
-  @Override
-  public void onOutputClear(String noteId, String paragraphId) {
-
-  }
-
-  @Override
-  public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
-
-  }
-
-  @Override
-  public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
-    if (callback != null) {
-      callback.onFinished(new LinkedList<>());
-    }
-  }
-
-  @Override
-  public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
-
-  }
-
-  @Override
-  public void onParaInfosReceived(String noteId, String paragraphId,
-      String interpreterSettingId, Map<String, String> metaInfos) {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
deleted file mode 100644
index b85d7ef..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.*;
-
-import java.util.HashMap;
-import java.util.Properties;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.interpreter.Constants;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.junit.Test;
-
-public class RemoteInterpreterProcessTest {
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-  private static final int DUMMY_PORT=3678;
-
-  @Test
-  public void testStartStop() {
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
-        INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
-        10 * 1000, null, null,"fakeName");
-    assertFalse(rip.isRunning());
-    assertEquals(0, rip.referenceCount());
-    assertEquals(1, rip.reference(intpGroup, "anonymous", false));
-    assertEquals(2, rip.reference(intpGroup, "anonymous", false));
-    assertEquals(true, rip.isRunning());
-    assertEquals(1, rip.dereference());
-    assertEquals(true, rip.isRunning());
-    assertEquals(0, rip.dereference());
-    assertEquals(false, rip.isRunning());
-  }
-
-  @Test
-  public void testClientFactory() throws Exception {
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
-        INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
-        mock(RemoteInterpreterEventPoller.class), 10 * 1000, "fakeName");
-    rip.reference(intpGroup, "anonymous", false);
-    assertEquals(0, rip.getNumActiveClient());
-    assertEquals(0, rip.getNumIdleClient());
-
-    Client client = rip.getClient();
-    assertEquals(1, rip.getNumActiveClient());
-    assertEquals(0, rip.getNumIdleClient());
-
-    rip.releaseClient(client);
-    assertEquals(0, rip.getNumActiveClient());
-    assertEquals(1, rip.getNumIdleClient());
-
-    rip.dereference();
-  }
-
-  @Test
-  public void testStartStopRemoteInterpreter() throws TException, InterruptedException {
-    RemoteInterpreterServer server = new RemoteInterpreterServer(3678);
-    server.start();
-    boolean running = false;
-    long startTime = System.currentTimeMillis();
-    while (System.currentTimeMillis() - startTime < 10 * 1000) {
-      if (server.isRunning()) {
-        running = true;
-        break;
-      } else {
-        Thread.sleep(200);
-      }
-    }
-    Properties properties = new Properties();
-    properties.setProperty(Constants.ZEPPELIN_INTERPRETER_PORT, "3678");
-    properties.setProperty(Constants.ZEPPELIN_INTERPRETER_HOST, "localhost");
-    InterpreterGroup intpGroup = mock(InterpreterGroup.class);
-    when(intpGroup.getProperty()).thenReturn(properties);
-    when(intpGroup.containsKey(Constants.EXISTING_PROCESS)).thenReturn(true);
-
-    RemoteInterpreterProcess rip = new RemoteInterpreterManagedProcess(
-        INTERPRETER_SCRIPT,
-        "nonexists",
-        "fakeRepo",
-        new HashMap<String, String>(),
-        mock(RemoteInterpreterEventPoller.class)
-        , 10 * 1000,
-        "fakeName");
-    assertFalse(rip.isRunning());
-    assertEquals(0, rip.referenceCount());
-    assertEquals(1, rip.reference(intpGroup, "anonymous", false));
-    assertEquals(true, rip.isRunning());
-  }
-
-
-  @Test
-  public void testPropagateError() throws TException, InterruptedException {
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
-        "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
-        10 * 1000, null, null, "fakeName");
-    assertFalse(rip.isRunning());
-    assertEquals(0, rip.referenceCount());
-    try {
-      assertEquals(1, rip.reference(intpGroup, "anonymous", false));
-    } catch (InterpreterException e) {
-      e.getMessage().contains("hello_world");
-    }
-    assertEquals(0, rip.referenceCount());
-  }
-}


[3/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine

Posted by jo...@apache.org.
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
deleted file mode 100644
index ffcb8d5..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ /dev/null
@@ -1,914 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-public class RemoteInterpreterTest {
-
-
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-
-  private InterpreterGroup intpGroup;
-  private HashMap<String, String> env;
-
-  @Before
-  public void setUp() throws Exception {
-    intpGroup = new InterpreterGroup();
-    env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    intpGroup.close();
-  }
-
-  private RemoteInterpreter createMockInterpreterA(Properties p) {
-    return createMockInterpreterA(p, "note");
-  }
-
-  private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) {
-    return new RemoteInterpreter(
-        p,
-        noteId,
-        MockInterpreterA.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-  }
-
-  private RemoteInterpreter createMockInterpreterB(Properties p) {
-    return createMockInterpreterB(p, "note");
-  }
-
-  private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) {
-    return new RemoteInterpreter(
-        p,
-        noteId,
-        MockInterpreterB.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-  }
-
-  @Test
-  public void testRemoteInterperterCall() throws TTransportException, IOException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpB = createMockInterpreterB(p);
-
-    intpGroup.get("note").add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-    process.equals(intpB.getInterpreterProcess());
-
-    assertFalse(process.isRunning());
-    assertEquals(0, process.getNumIdleClient());
-    assertEquals(0, process.referenceCount());
-
-    intpA.open(); // initializa all interpreters in the same group
-    assertTrue(process.isRunning());
-    assertEquals(1, process.getNumIdleClient());
-    assertEquals(1, process.referenceCount());
-
-    intpA.interpret("1",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-
-    intpB.open();
-    assertEquals(1, process.referenceCount());
-
-    intpA.close();
-    assertEquals(0, process.referenceCount());
-    intpB.close();
-    assertEquals(0, process.referenceCount());
-
-    assertFalse(process.isRunning());
-
-  }
-
-  @Test
-  public void testExecuteIncorrectPrecode() throws TTransportException, IOException {
-    Properties p = new Properties();
-    p.put("zeppelin.MockInterpreterA.precode", "fail test");
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-
-    intpA.open();
-    
-    InterpreterResult result = intpA.interpret("1",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-
-
-
-    intpA.close();
-    assertEquals(Code.ERROR, result.code());
-  }
-
-  @Test
-  public void testExecuteCorrectPrecode() throws TTransportException, IOException {
-    Properties p = new Properties();
-    p.put("zeppelin.MockInterpreterA.precode", "2");
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-
-    intpA.open();
-
-    InterpreterResult result = intpA.interpret("1",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-
-
-
-    intpA.close();
-    assertEquals(Code.SUCCESS, result.code());
-    assertEquals("1", result.message().get(0).getData());
-  }
-
-  @Test
-  public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
-    Properties p = new Properties();
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    InterpreterResult ret = intpA.interpret("non numeric value",
-        new InterpreterContext(
-            "noteId",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-
-    assertEquals(Code.ERROR, ret.code());
-  }
-
-  @Test
-  public void testRemoteSchedulerSharing() throws TTransportException, IOException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterA.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterB.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.get("note").add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    intpB.open();
-
-    long start = System.currentTimeMillis();
-    InterpreterResult ret = intpA.interpret("500",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-    assertEquals("500", ret.message().get(0).getData());
-
-    ret = intpB.interpret("500",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-    assertEquals("1000", ret.message().get(0).getData());
-    long end = System.currentTimeMillis();
-    assertTrue(end - start >= 1000);
-
-
-    intpA.close();
-    intpB.close();
-  }
-
-  @Test
-  public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    final RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    final RemoteInterpreter intpB = createMockInterpreterB(p);
-
-    intpGroup.get("note").add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    intpB.open();
-
-    long start = System.currentTimeMillis();
-    Job jobA = new Job("jobA", null) {
-      private Object r;
-
-      @Override
-      public Object getReturn() {
-        return r;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.r = results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        return intpA.interpret("500",
-            new InterpreterContext(
-                "note",
-                "jobA",
-                null,
-                "title",
-                "text",
-                new AuthenticationInfo(),
-                new HashMap<String, Object>(),
-                new GUI(),
-                new AngularObjectRegistry(intpGroup.getId(), null),
-                new LocalResourcePool("pool1"),
-                new LinkedList<InterpreterContextRunner>(), null));
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-    };
-    intpA.getScheduler().submit(jobA);
-
-    Job jobB = new Job("jobB", null) {
-
-      private Object r;
-
-      @Override
-      public Object getReturn() {
-        return r;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.r = results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        return intpB.interpret("500",
-            new InterpreterContext(
-                "note",
-                "jobB",
-                null,
-                "title",
-                "text",
-                new AuthenticationInfo(),
-                new HashMap<String, Object>(),
-                new GUI(),
-                new AngularObjectRegistry(intpGroup.getId(), null),
-                new LocalResourcePool("pool1"),
-                new LinkedList<InterpreterContextRunner>(), null));
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-    };
-    intpB.getScheduler().submit(jobB);
-    // wait until both job finished
-    while (jobA.getStatus() != Status.FINISHED ||
-           jobB.getStatus() != Status.FINISHED) {
-      Thread.sleep(100);
-    }
-    long end = System.currentTimeMillis();
-    assertTrue(end - start >= 1000);
-
-    assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message().get(0).getData());
-
-    intpA.close();
-    intpB.close();
-  }
-
-  @Test
-  public void testRunOrderPreserved() throws InterruptedException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    final RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    int concurrency = 3;
-    final List<InterpreterResultMessage> results = new LinkedList<>();
-
-    Scheduler scheduler = intpA.getScheduler();
-    for (int i = 0; i < concurrency; i++) {
-      final String jobId = Integer.toString(i);
-      scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
-        private Object r;
-
-        @Override
-        public Object getReturn() {
-          return r;
-        }
-
-        @Override
-        public void setResult(Object results) {
-          this.r = results;
-        }
-
-        @Override
-        public int progress() {
-          return 0;
-        }
-
-        @Override
-        public Map<String, Object> info() {
-          return null;
-        }
-
-        @Override
-        protected Object jobRun() throws Throwable {
-          InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext(
-              "note",
-              jobId,
-              null,
-              "title",
-              "text",
-              new AuthenticationInfo(),
-              new HashMap<String, Object>(),
-              new GUI(),
-              new AngularObjectRegistry(intpGroup.getId(), null),
-              new LocalResourcePool("pool1"),
-              new LinkedList<InterpreterContextRunner>(), null));
-
-          synchronized (results) {
-            results.addAll(ret.message());
-            results.notify();
-          }
-          return null;
-        }
-
-        @Override
-        protected boolean jobAbort() {
-          return false;
-        }
-
-      });
-    }
-
-    // wait for job finished
-    synchronized (results) {
-      while (results.size() != concurrency) {
-        results.wait(300);
-      }
-    }
-
-    int i = 0;
-    for (InterpreterResultMessage result : results) {
-      assertEquals(Integer.toString(i++), result.getData());
-    }
-    assertEquals(concurrency, i);
-
-    intpA.close();
-  }
-
-
-  @Test
-  public void testRunParallel() throws InterruptedException {
-    Properties p = new Properties();
-    p.put("parallel", "true");
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    final RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    int concurrency = 4;
-    final int timeToSleep = 1000;
-    final List<InterpreterResultMessage> results = new LinkedList<>();
-    long start = System.currentTimeMillis();
-
-    Scheduler scheduler = intpA.getScheduler();
-    for (int i = 0; i < concurrency; i++) {
-      final String jobId = Integer.toString(i);
-      scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
-        private Object r;
-
-        @Override
-        public Object getReturn() {
-          return r;
-        }
-
-        @Override
-        public void setResult(Object results) {
-          this.r = results;
-        }
-
-        @Override
-        public int progress() {
-          return 0;
-        }
-
-        @Override
-        public Map<String, Object> info() {
-          return null;
-        }
-
-        @Override
-        protected Object jobRun() throws Throwable {
-          String stmt = Integer.toString(timeToSleep);
-          InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
-              "note",
-              jobId,
-              null,
-              "title",
-              "text",
-              new AuthenticationInfo(),
-              new HashMap<String, Object>(),
-              new GUI(),
-              new AngularObjectRegistry(intpGroup.getId(), null),
-              new LocalResourcePool("pool1"),
-              new LinkedList<InterpreterContextRunner>(), null));
-
-          synchronized (results) {
-            results.addAll(ret.message());
-            results.notify();
-          }
-          return stmt;
-        }
-
-        @Override
-        protected boolean jobAbort() {
-          return false;
-        }
-
-      });
-    }
-
-    // wait for job finished
-    synchronized (results) {
-      while (results.size() != concurrency) {
-        results.wait(300);
-      }
-    }
-
-    long end = System.currentTimeMillis();
-
-    assertTrue(end - start < timeToSleep * concurrency);
-
-    intpA.close();
-  }
-
-  @Test
-  public void testInterpreterGroupResetBeforeProcessStarts() {
-    Properties p = new Properties();
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpA.setInterpreterGroup(intpGroup);
-    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
-
-    intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
-    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
-
-    assertNotSame(processA.hashCode(), processB.hashCode());
-  }
-
-  @Test
-  public void testInterpreterGroupResetAfterProcessFinished() {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpA.setInterpreterGroup(intpGroup);
-    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
-    intpA.open();
-
-    processA.dereference();    // intpA.close();
-
-    intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
-    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
-
-    assertNotSame(processA.hashCode(), processB.hashCode());
-  }
-
-  @Test
-  public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    final RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    Job jobA = new Job("jobA", null) {
-      private Object r;
-
-      @Override
-      public Object getReturn() {
-        return r;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.r = results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        return intpA.interpret("2000",
-            new InterpreterContext(
-                "note",
-                "jobA",
-                null,
-                "title",
-                "text",
-                new AuthenticationInfo(),
-                new HashMap<String, Object>(),
-                new GUI(),
-                new AngularObjectRegistry(intpGroup.getId(), null),
-                new LocalResourcePool("pool1"),
-                new LinkedList<InterpreterContextRunner>(), null));
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-    };
-    intpA.getScheduler().submit(jobA);
-
-    // wait for job started
-    while (intpA.getScheduler().getJobsRunning().size() == 0) {
-      Thread.sleep(100);
-    }
-
-    // restart interpreter
-    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
-    intpA.close();
-
-    InterpreterGroup newInterpreterGroup =
-        new InterpreterGroup(intpA.getInterpreterGroup().getId());
-    newInterpreterGroup.put("note", new LinkedList<Interpreter>());
-
-    intpA.setInterpreterGroup(newInterpreterGroup);
-    intpA.open();
-    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
-
-    assertNotSame(processA.hashCode(), processB.hashCode());
-
-  }
-
-  @Test
-  public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpB = createMockInterpreterB(p);
-
-    intpGroup.get("note").add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    intpB.open();
-
-    assertEquals(intpA.getScheduler(), intpB.getScheduler());
-  }
-
-  @Test
-  public void testMultiInterpreterSession() {
-    Properties p = new Properties();
-    intpGroup.put("sessionA", new LinkedList<Interpreter>());
-    intpGroup.put("sessionB", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
-    intpGroup.get("sessionA").add(intpAsessionA);
-    intpAsessionA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
-    intpGroup.get("sessionA").add(intpBsessionA);
-    intpBsessionA.setInterpreterGroup(intpGroup);
-
-    intpAsessionA.open();
-    intpBsessionA.open();
-
-    assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
-
-    RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
-    intpGroup.get("sessionB").add(intpAsessionB);
-    intpAsessionB.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
-    intpGroup.get("sessionB").add(intpBsessionB);
-    intpBsessionB.setInterpreterGroup(intpGroup);
-
-    intpAsessionB.open();
-    intpBsessionB.open();
-
-    assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
-    assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler());
-  }
-
-  @Test
-  public void should_push_local_angular_repo_to_remote() throws Exception {
-    //Given
-    final Client client = Mockito.mock(Client.class);
-    final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
-        MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null,
-        null, "anonymous", false);
-    final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
-    registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
-    final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
-    interpreterGroup.setAngularObjectRegistry(registry);
-    intr.setInterpreterGroup(interpreterGroup);
-
-    final java.lang.reflect.Type registryType = new TypeToken<Map<String,
-                Map<String, AngularObject>>>() {}.getType();
-    final Gson gson = new Gson();
-    final String expected = gson.toJson(registry.getRegistry(), registryType);
-
-    //When
-    intr.pushAngularObjectRegistryToRemote(client);
-
-    //Then
-    Mockito.verify(client).angularRegistryPush(expected);
-  }
-
-  @Test
-  public void testEnvStringPattern() {
-    assertFalse(RemoteInterpreter.isEnvString(null));
-    assertFalse(RemoteInterpreter.isEnvString(""));
-    assertFalse(RemoteInterpreter.isEnvString("abcDEF"));
-    assertFalse(RemoteInterpreter.isEnvString("ABC-DEF"));
-    assertTrue(RemoteInterpreter.isEnvString("ABCDEF"));
-    assertTrue(RemoteInterpreter.isEnvString("ABC_DEF"));
-    assertTrue(RemoteInterpreter.isEnvString("ABC_DEF123"));
-  }
-
-  @Test
-  public void testEnvronmentAndPropertySet() {
-    Properties p = new Properties();
-    p.setProperty("MY_ENV1", "env value 1");
-    p.setProperty("my.property.1", "property value 1");
-
-    RemoteInterpreter intp = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterEnv.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intp);
-    intp.setInterpreterGroup(intpGroup);
-
-    intp.open();
-
-    InterpreterContext context = new InterpreterContext(
-        "noteId",
-        "id",
-        null,
-        "title",
-        "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("pool1"),
-        new LinkedList<InterpreterContextRunner>(), null);
-
-
-    assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData());
-    assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code());
-    assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code());
-    assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData());
-
-    intp.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
deleted file mode 100644
index 975d6ea..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.junit.Test;
-
-public class RemoteInterpreterUtilsTest {
-
-  @Test
-  public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
-    assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
deleted file mode 100644
index 81a9164..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-public class MockInterpreterA extends Interpreter {
-
-  private String lastSt;
-
-  public MockInterpreterA(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    //new RuntimeException().printStackTrace();
-  }
-
-  @Override
-  public void close() {
-  }
-
-  public String getLastStatement() {
-    return lastSt;
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    try {
-      Thread.sleep(Long.parseLong(st));
-      this.lastSt = st;
-    } catch (NumberFormatException | InterruptedException e) {
-      throw new InterpreterException(e);
-    }
-    return new InterpreterResult(Code.SUCCESS, st);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    if (getProperty("parallel") != null && getProperty("parallel").equals("true")) {
-      return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10);
-    } else {
-      return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
deleted file mode 100644
index d4b26ad..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-
-public class MockInterpreterAngular extends Interpreter {
-
-  AtomicInteger numWatch = new AtomicInteger(0);
-
-  public MockInterpreterAngular(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    String[] stmt = st.split(" ");
-    String cmd = stmt[0];
-    String name = null;
-    if (stmt.length >= 2) {
-      name = stmt[1];
-    }
-    String value = null;
-    if (stmt.length == 3) {
-      value = stmt[2];
-    }
-
-    AngularObjectRegistry registry = context.getAngularObjectRegistry();
-
-    if (cmd.equals("add")) {
-      registry.add(name, value, context.getNoteId(), null);
-      registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher
-              (null) {
-
-        @Override
-        public void watch(Object oldObject, Object newObject,
-            InterpreterContext context) {
-          numWatch.incrementAndGet();
-        }
-
-      });
-    } else if (cmd.equalsIgnoreCase("update")) {
-      registry.get(name, context.getNoteId(), null).set(value);
-    } else if (cmd.equals("remove")) {
-      registry.remove(name, context.getNoteId(), null);
-    }
-
-    try {
-      Thread.sleep(500); // wait for watcher executed
-    } catch (InterruptedException e) {
-      logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e);
-    }
-
-    String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch
-            .get());
-    return new InterpreterResult(Code.SUCCESS, msg);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
deleted file mode 100644
index 7103335..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-
-public class MockInterpreterB extends Interpreter {
-
-  public MockInterpreterB(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    //new RuntimeException().printStackTrace();
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    MockInterpreterA intpA = getInterpreterA();
-    String intpASt = intpA.getLastStatement();
-    long timeToSleep = Long.parseLong(st);
-    if (intpASt != null) {
-      timeToSleep += Long.parseLong(intpASt);
-    }
-    try {
-      Thread.sleep(timeToSleep);
-    } catch (NumberFormatException | InterruptedException e) {
-      throw new InterpreterException(e);
-    }
-    return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-
-  public MockInterpreterA getInterpreterA() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-    synchronized (interpreterGroup) {
-      for (List<Interpreter> interpreters : interpreterGroup.values()) {
-        boolean belongsToSameNoteGroup = false;
-        MockInterpreterA a = null;
-        for (Interpreter intp : interpreters) {
-          if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
-            Interpreter p = intp;
-            while (p instanceof WrappedInterpreter) {
-              p = ((WrappedInterpreter) p).getInnerInterpreter();
-            }
-            a = (MockInterpreterA) p;
-          }
-
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          if (this == p) {
-            belongsToSameNoteGroup = true;
-          }
-        }
-        if (belongsToSameNoteGroup) {
-          return a;
-        }
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    MockInterpreterA intpA = getInterpreterA();
-    if (intpA != null) {
-      return intpA.getScheduler();
-    }
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
deleted file mode 100644
index 12e11f7..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-
-public class MockInterpreterEnv extends Interpreter {
-
-  public MockInterpreterEnv(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    String[] cmd = st.split(" ");
-    if (cmd[0].equals("getEnv")) {
-      return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1]));
-    } else if (cmd[0].equals("getProperty")){
-      return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1]));
-    } else {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]);
-    }
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
deleted file mode 100644
index 349315c..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * MockInterpreter to test outputstream
- */
-public class MockInterpreterOutputStream extends Interpreter {
-  private String lastSt;
-
-  public MockInterpreterOutputStream(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    //new RuntimeException().printStackTrace();
-  }
-
-  @Override
-  public void close() {
-  }
-
-  public String getLastStatement() {
-    return lastSt;
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    String[] ret = st.split(":");
-    try {
-      if (ret[1] != null) {
-        context.out.write(ret[1]);
-      }
-    } catch (IOException e) {
-      throw new InterpreterException(e);
-    }
-    return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ?
-            ret[2] : "");
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
deleted file mode 100644
index c4ff6ab..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.gson.Gson;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourcePool;
-
-public class MockInterpreterResourcePool extends Interpreter {
-
-  AtomicInteger numWatch = new AtomicInteger(0);
-
-  public MockInterpreterResourcePool(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    String[] stmt = st.split(" ");
-    String cmd = stmt[0];
-    String noteId = null;
-    String paragraphId = null;
-    String name = null;
-    if (stmt.length >= 2) {
-      String[] npn = stmt[1].split(":");
-      if (npn.length >= 3) {
-        noteId = npn[0];
-        paragraphId = npn[1];
-        name = npn[2];
-      } else {
-        name = stmt[1];
-      }
-    }
-    String value = null;
-    if (stmt.length >= 3) {
-      value = stmt[2];
-    }
-
-    ResourcePool resourcePool = context.getResourcePool();
-    Object ret = null;
-    if (cmd.equals("put")) {
-      resourcePool.put(noteId, paragraphId, name, value);
-    } else if (cmd.equalsIgnoreCase("get")) {
-      Resource resource = resourcePool.get(noteId, paragraphId, name);
-      if (resource != null) {
-        ret = resourcePool.get(noteId, paragraphId, name).get();
-      } else {
-        ret = "";
-      }
-    } else if (cmd.equals("remove")) {
-      ret = resourcePool.remove(noteId, paragraphId, name);
-    } else if (cmd.equals("getAll")) {
-      ret = resourcePool.getAll();
-    } else if (cmd.equals("invoke")) {
-      Resource resource = resourcePool.get(noteId, paragraphId, name);
-      if (stmt.length >=4) {
-        Resource res = resource.invokeMethod(value, null, null, stmt[3]);
-        ret = res.get();
-      } else {
-        ret = resource.invokeMethod(value, null, null);
-      }
-    }
-
-    try {
-      Thread.sleep(500); // wait for watcher executed
-    } catch (InterruptedException e) {
-    }
-
-    Gson gson = new Gson();
-    return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
deleted file mode 100644
index 363ccf6..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.resource;
-
-import com.google.gson.Gson;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Unittest for DistributedResourcePool
- */
-public class DistributedResourcePoolTest {
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-  private InterpreterGroup intpGroup1;
-  private InterpreterGroup intpGroup2;
-  private HashMap<String, String> env;
-  private RemoteInterpreter intp1;
-  private RemoteInterpreter intp2;
-  private InterpreterContext context;
-  private RemoteInterpreterEventPoller eventPoller1;
-  private RemoteInterpreterEventPoller eventPoller2;
-
-
-  @Before
-  public void setUp() throws Exception {
-    env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
-    Properties p = new Properties();
-
-    intp1 = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterResourcePool.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false
-    );
-
-    intpGroup1 = new InterpreterGroup("intpGroup1");
-    intpGroup1.put("note", new LinkedList<Interpreter>());
-    intpGroup1.get("note").add(intp1);
-    intp1.setInterpreterGroup(intpGroup1);
-
-    intp2 = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterResourcePool.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",        
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false
-    );
-
-    intpGroup2 = new InterpreterGroup("intpGroup2");
-    intpGroup2.put("note", new LinkedList<Interpreter>());
-    intpGroup2.get("note").add(intp2);
-    intp2.setInterpreterGroup(intpGroup2);
-
-    context = new InterpreterContext(
-        "note",
-        "id",
-        null,
-        "title",
-        "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        null,
-        null,
-        new LinkedList<InterpreterContextRunner>(),
-        null);
-
-    intp1.open();
-    intp2.open();
-
-    eventPoller1 = new RemoteInterpreterEventPoller(null, null);
-    eventPoller1.setInterpreterGroup(intpGroup1);
-    eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
-
-    eventPoller2 = new RemoteInterpreterEventPoller(null, null);
-    eventPoller2.setInterpreterGroup(intpGroup2);
-    eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
-
-    eventPoller1.start();
-    eventPoller2.start();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    eventPoller1.shutdown();
-    intp1.close();
-    intpGroup1.close();
-    eventPoller2.shutdown();
-    intp2.close();
-    intpGroup2.close();
-  }
-
-  @Test
-  public void testRemoteDistributedResourcePool() {
-    Gson gson = new Gson();
-    InterpreterResult ret;
-    intp1.interpret("put key1 value1", context);
-    intp2.interpret("put key2 value2", context);
-
-    ret = intp1.interpret("getAll", context);
-    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
-    ret = intp2.interpret("getAll", context);
-    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
-    ret = intp1.interpret("get key1", context);
-    assertEquals("value1", gson.fromJson(ret.message().get(0).getData(), String.class));
-
-    ret = intp1.interpret("get key2", context);
-    assertEquals("value2", gson.fromJson(ret.message().get(0).getData(), String.class));
-  }
-
-  @Test
-  public void testDistributedResourcePool() {
-    final LocalResourcePool pool2 = new LocalResourcePool("pool2");
-    final LocalResourcePool pool3 = new LocalResourcePool("pool3");
-
-    DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new ResourcePoolConnector() {
-      @Override
-      public ResourceSet getAllResources() {
-        ResourceSet set = pool2.getAll();
-        set.addAll(pool3.getAll());
-
-        ResourceSet remoteSet = new ResourceSet();
-        Gson gson = new Gson();
-        for (Resource s : set) {
-          RemoteResource remoteResource = gson.fromJson(gson.toJson(s), RemoteResource.class);
-          remoteResource.setResourcePoolConnector(this);
-          remoteSet.add(remoteResource);
-        }
-        return remoteSet;
-      }
-
-      @Override
-      public Object readResource(ResourceId id) {
-        if (id.getResourcePoolId().equals(pool2.id())) {
-          return pool2.get(id.getName()).get();
-        }
-        if (id.getResourcePoolId().equals(pool3.id())) {
-          return pool3.get(id.getName()).get();
-        }
-        return null;
-      }
-
-      @Override
-      public Object invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] params) {
-        return null;
-      }
-
-      @Override
-      public Resource invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[]
-          params, String returnResourceName) {
-        return null;
-      }
-    });
-
-    assertEquals(0, pool1.getAll().size());
-
-
-    // test get() can get from pool
-    pool2.put("object1", "value2");
-    assertEquals(1, pool1.getAll().size());
-    assertTrue(pool1.get("object1").isRemote());
-    assertEquals("value2", pool1.get("object1").get());
-
-    // test get() is locality aware
-    pool1.put("object1", "value1");
-    assertEquals(1, pool2.getAll().size());
-    assertEquals("value1", pool1.get("object1").get());
-
-    // test getAll() is locality aware
-    assertEquals("value1", pool1.getAll().get(0).get());
-    assertEquals("value2", pool1.getAll().get(1).get());
-  }
-
-  @Test
-  public void testResourcePoolUtils() {
-    Gson gson = new Gson();
-    InterpreterResult ret;
-
-    // when create some resources
-    intp1.interpret("put note1:paragraph1:key1 value1", context);
-    intp1.interpret("put note1:paragraph2:key1 value2", context);
-    intp2.interpret("put note2:paragraph1:key1 value1", context);
-    intp2.interpret("put note2:paragraph2:key2 value2", context);
-
-
-    // then get all resources.
-    assertEquals(4, ResourcePoolUtils.getAllResources().size());
-
-    // when remove all resources from note1
-    ResourcePoolUtils.removeResourcesBelongsToNote("note1");
-
-    // then resources should be removed.
-    assertEquals(2, ResourcePoolUtils.getAllResources().size());
-    assertEquals("", gson.fromJson(
-        intp1.interpret("get note1:paragraph1:key1", context).message().get(0).getData(),
-        String.class));
-    assertEquals("", gson.fromJson(
-        intp1.interpret("get note1:paragraph2:key1", context).message().get(0).getData(),
-        String.class));
-
-
-    // when remove all resources from note2:paragraph1
-    ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
-
-    // then 1
-    assertEquals(1, ResourcePoolUtils.getAllResources().size());
-    assertEquals("value2", gson.fromJson(
-        intp1.interpret("get note2:paragraph2:key2", context).message().get(0).getData(),
-        String.class));
-
-  }
-
-  @Test
-  public void testResourceInvokeMethod() {
-    Gson gson = new Gson();
-    InterpreterResult ret;
-    intp1.interpret("put key1 hey", context);
-    intp2.interpret("put key2 world", context);
-
-    // invoke method in local resource pool
-    ret = intp1.interpret("invoke key1 length", context);
-    assertEquals("3", ret.message().get(0).getData());
-
-    // invoke method in remote resource pool
-    ret = intp1.interpret("invoke key2 length", context);
-    assertEquals("5", ret.message().get(0).getData());
-
-    // make sure no resources are automatically created
-    ret = intp1.interpret("getAll", context);
-    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
-    // invoke method in local resource pool and save result
-    ret = intp1.interpret("invoke key1 length ret1", context);
-    assertEquals("3", ret.message().get(0).getData());
-
-    ret = intp1.interpret("getAll", context);
-    assertEquals(3, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
-    ret = intp1.interpret("get ret1", context);
-    assertEquals("3", gson.fromJson(ret.message().get(0).getData(), String.class));
-
-    // invoke method in remote resource pool and save result
-    ret = intp1.interpret("invoke key2 length ret2", context);
-    assertEquals("5", ret.message().get(0).getData());
-
-    ret = intp1.interpret("getAll", context);
-    assertEquals(4, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
-
-    ret = intp1.interpret("get ret2", context);
-    assertEquals("5", gson.fromJson(ret.message().get(0).getData(), String.class));
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
deleted file mode 100644
index ebb5100..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.scheduler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
-
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-  private SchedulerFactory schedulerSvc;
-  private static final int TICK_WAIT = 100;
-  private static final int MAX_WAIT_CYCLES = 100;
-
-  @Before
-  public void setUp() throws Exception{
-    schedulerSvc = new SchedulerFactory();
-  }
-
-  @After
-  public void tearDown(){
-
-  }
-
-  @Test
-  public void test() throws Exception {
-    Properties p = new Properties();
-    final InterpreterGroup intpGroup = new InterpreterGroup();
-    Map<String, String> env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterA.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        this,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
-        intpA.getInterpreterProcess(),
-        10);
-
-    Job job = new Job("jobId", "jobName", null, 200) {
-      Object results;
-      @Override
-      public Object getReturn() {
-        return results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        intpA.interpret("1000", new InterpreterContext(
-            "note",
-            "jobId",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-        return "1000";
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.results = results;
-      }
-    };
-    scheduler.submit(job);
-
-    int cycles = 0;
-    while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) {
-      Thread.sleep(TICK_WAIT);
-      cycles++;
-    }
-    assertTrue(job.isRunning());
-
-    Thread.sleep(5*TICK_WAIT);
-    assertEquals(0, scheduler.getJobsWaiting().size());
-    assertEquals(1, scheduler.getJobsRunning().size());
-
-    cycles = 0;
-    while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) {
-      Thread.sleep(TICK_WAIT);
-      cycles++;
-    }
-
-    assertTrue(job.isTerminated());
-    assertEquals(0, scheduler.getJobsWaiting().size());
-    assertEquals(0, scheduler.getJobsRunning().size());
-
-    intpA.close();
-    schedulerSvc.removeScheduler("test");
-  }
-
-  @Test
-  public void testAbortOnPending() throws Exception {
-    Properties p = new Properties();
-    final InterpreterGroup intpGroup = new InterpreterGroup();
-    Map<String, String> env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterA.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        this,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note",
-        intpA.getInterpreterProcess(),
-        10);
-
-    Job job1 = new Job("jobId1", "jobName1", null, 200) {
-      Object results;
-      InterpreterContext context = new InterpreterContext(
-          "note",
-          "jobId1",
-          null,
-          "title",
-          "text",
-          new AuthenticationInfo(),
-          new HashMap<String, Object>(),
-          new GUI(),
-          new AngularObjectRegistry(intpGroup.getId(), null),
-          new LocalResourcePool("pool1"),
-          new LinkedList<InterpreterContextRunner>(), null);
-
-      @Override
-      public Object getReturn() {
-        return results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        intpA.interpret("1000", context);
-        return "1000";
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        if (isRunning()) {
-          intpA.cancel(context);
-        }
-        return true;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.results = results;
-      }
-    };
-
-    Job job2 = new Job("jobId2", "jobName2", null, 200) {
-      public Object results;
-      InterpreterContext context = new InterpreterContext(
-          "note",
-          "jobId2",
-          null,
-          "title",
-          "text",
-          new AuthenticationInfo(),
-          new HashMap<String, Object>(),
-          new GUI(),
-          new AngularObjectRegistry(intpGroup.getId(), null),
-          new LocalResourcePool("pool1"),
-          new LinkedList<InterpreterContextRunner>(), null);
-
-      @Override
-      public Object getReturn() {
-        return results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        intpA.interpret("1000", context);
-        return "1000";
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        if (isRunning()) {
-          intpA.cancel(context);
-        }
-        return true;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.results = results;
-      }
-    };
-
-    job2.setResult("result2");
-
-    scheduler.submit(job1);
-    scheduler.submit(job2);
-
-
-    int cycles = 0;
-    while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
-      Thread.sleep(TICK_WAIT);
-      cycles++;
-    }
-    assertTrue(job1.isRunning());
-    assertTrue(job2.getStatus() == Status.PENDING);
-
-    job2.abort();
-
-    cycles = 0;
-    while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
-      Thread.sleep(TICK_WAIT);
-      cycles++;
-    }
-
-    assertNotNull(job1.getDateFinished());
-    assertTrue(job1.isTerminated());
-    assertNull(job2.getDateFinished());
-    assertTrue(job2.isTerminated());
-    assertEquals("result2", job2.getReturn());
-
-    intpA.close();
-    schedulerSvc.removeScheduler("test");
-  }
-
-  @Override
-  public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
-
-  }
-
-  @Override
-  public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
-
-  }
-
-  @Override
-  public void onOutputClear(String noteId, String paragraphId) {
-
-  }
-
-  @Override
-  public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
-
-  }
-
-  @Override
-  public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
-    if (callback != null) {
-      callback.onFinished(new LinkedList<>());
-    }
-  }
-
-  @Override
-  public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception {
-  }
-
-  @Override
-  public void onParaInfosReceived(String noteId, String paragraphId, 
-      String interpreterSettingId, Map<String, String> metaInfos) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
new file mode 100644
index 0000000..0ac7116
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.List;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ * Proxy for AngularObjectRegistry that exists in remote interpreter process
+ */
+public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
+  Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
+  private InterpreterGroup interpreterGroup;
+
+  public RemoteAngularObjectRegistry(String interpreterId,
+      AngularObjectRegistryListener listener,
+      InterpreterGroup interpreterGroup) {
+    super(interpreterId, listener);
+    this.interpreterGroup = interpreterGroup;
+  }
+
+  private RemoteInterpreterProcess getRemoteInterpreterProcess() {
+    return interpreterGroup.getRemoteInterpreterProcess();
+  }
+
+  /**
+   * When ZeppelinServer side code want to add angularObject to the registry,
+   * this method should be used instead of add()
+   * @param name
+   * @param o
+   * @param noteId
+   * @return
+   */
+  public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String
+          paragraphId) {
+    Gson gson = new Gson();
+    RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
+    if (!remoteInterpreterProcess.isRunning()) {
+      return super.add(name, o, noteId, paragraphId, true);
+    }
+
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = remoteInterpreterProcess.getClient();
+      client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
+      return super.add(name, o, noteId, paragraphId, true);
+    } catch (TException e) {
+      broken = true;
+      logger.error("Error", e);
+    } catch (Exception e) {
+      logger.error("Error", e);
+    } finally {
+      if (client != null) {
+        remoteInterpreterProcess.releaseClient(client, broken);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * When ZeppelinServer side code want to remove angularObject from the registry,
+   * this method should be used instead of remove()
+   * @param name
+   * @param noteId
+   * @param paragraphId
+   * @return
+   */
+  public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String
+          paragraphId) {
+    RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
+    if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
+      return super.remove(name, noteId, paragraphId);
+    }
+
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = remoteInterpreterProcess.getClient();
+      client.angularObjectRemove(name, noteId, paragraphId);
+      return super.remove(name, noteId, paragraphId);
+    } catch (TException e) {
+      broken = true;
+      logger.error("Error", e);
+    } catch (Exception e) {
+      logger.error("Error", e);
+    } finally {
+      if (client != null) {
+        remoteInterpreterProcess.releaseClient(client, broken);
+      }
+    }
+    return null;
+  }
+  
+  public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
+    List<AngularObject> all = getAll(noteId, paragraphId);
+    for (AngularObject ao : all) {
+      removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId);
+    }
+  }
+
+  @Override
+  protected AngularObject createNewAngularObject(String name, Object o, String noteId, String
+          paragraphId) {
+    return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup,
+        getAngularObjectListener());
+  }
+}