You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/04/06 06:05:58 UTC

[07/17] incubator-zeppelin git commit: Rename package/groupId to org.apache and apply rat plugin.

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
new file mode 100644
index 0000000..e7f950a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.zeppelin.scheduler.Job.Status;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class FIFOScheduler implements Scheduler {
+  List<Job> queue = new LinkedList<Job>();
+  private ExecutorService executor;
+  private SchedulerListener listener;
+  boolean terminate = false;
+  Job runningJob = null;
+  private String name;
+
+  public FIFOScheduler(String name, ExecutorService executor, SchedulerListener listener) {
+    this.name = name;
+    this.executor = executor;
+    this.listener = listener;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public Collection<Job> getJobsWaiting() {
+    List<Job> ret = new LinkedList<Job>();
+    synchronized (queue) {
+      for (Job job : queue) {
+        ret.add(job);
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public Collection<Job> getJobsRunning() {
+    List<Job> ret = new LinkedList<Job>();
+    Job job = runningJob;
+
+    if (job != null) {
+      ret.add(job);
+    }
+
+    return ret;
+  }
+
+
+
+  @Override
+  public void submit(Job job) {
+    job.setStatus(Status.PENDING);
+    synchronized (queue) {
+      queue.add(job);
+      queue.notify();
+    }
+  }
+
+  @Override
+  public void run() {
+
+    synchronized (queue) {
+      while (terminate == false) {
+        if (runningJob != null || queue.isEmpty() == true) {
+          try {
+            queue.wait(500);
+          } catch (InterruptedException e) {
+          }
+          continue;
+        }
+
+        runningJob = queue.remove(0);
+
+        final Scheduler scheduler = this;
+        this.executor.execute(new Runnable() {
+          @Override
+          public void run() {
+            if (runningJob.isAborted()) {
+              runningJob.setStatus(Status.ABORT);
+              runningJob.aborted = false;
+              synchronized (queue) {
+                queue.notify();
+              }
+              return;
+            }
+
+            runningJob.setStatus(Status.RUNNING);
+            if (listener != null) {
+              listener.jobStarted(scheduler, runningJob);
+            }
+            runningJob.run();
+            if (runningJob.isAborted()) {
+              runningJob.setStatus(Status.ABORT);
+            } else {
+              if (runningJob.getException() != null) {
+                runningJob.setStatus(Status.ERROR);
+              } else {
+                runningJob.setStatus(Status.FINISHED);
+              }
+            }
+            if (listener != null) {
+              listener.jobFinished(scheduler, runningJob);
+            }
+            // reset aborted flag to allow retry
+            runningJob.aborted = false;
+            runningJob = null;
+            synchronized (queue) {
+              queue.notify();
+            }
+          }
+        });
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    terminate = true;
+    synchronized (queue) {
+      queue.notify();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
new file mode 100644
index 0000000..9837ad2
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Skeletal implementation of the Job concept.
+ *  - designed for inheritance
+ *  - should be run on a separate thread
+ *  - maintains internal state: it's status
+ *  - supports listeners who are updated on status change
+ *
+ *  Job class is serialized/deserialized and used server<->client communication
+ *  and saving/loading jobs from disk.
+ *  Changing/adding/deleting non transitive field name need consideration of that.
+ *
+ *  @author Leemoonsoo
+ */
+public abstract class Job {
+  /**
+   * Job status.
+   *
+   * READY - Job is not running, ready to run.
+   * PENDING - Job is submitted to scheduler. but not running yet
+   * RUNNING - Job is running.
+   * FINISHED - Job finished run. with success
+   * ERROR - Job finished run. with error
+   * ABORT - Job finished by abort
+   *
+   */
+  public static enum Status {
+    READY,
+    PENDING,
+    RUNNING,
+    FINISHED,
+    ERROR,
+    ABORT;
+    boolean isReady() {
+      return this == READY;
+    }
+
+    boolean isRunning() {
+      return this == RUNNING;
+    }
+
+    boolean isPending() {
+      return this == PENDING;
+    }
+  }
+
+  private String jobName;
+  String id;
+  Object result;
+  Date dateCreated;
+  Date dateStarted;
+  Date dateFinished;
+  Status status;
+
+  transient boolean aborted = false;
+
+  String errorMessage;
+  private transient Throwable exception;
+  private transient JobListener listener;
+  private long progressUpdateIntervalMs;
+
+  public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) {
+    this.jobName = jobName;
+    this.listener = listener;
+    this.progressUpdateIntervalMs = progressUpdateIntervalMs;
+
+    dateCreated = new Date();
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HHmmss");
+    id = dateFormat.format(dateCreated) + "_" + super.hashCode();
+
+    setStatus(Status.READY);
+  }
+
+  public Job(String jobName, JobListener listener) {
+    this(jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC);
+  }
+
+  public Job(String jobId, String jobName, JobListener listener, long progressUpdateIntervalMs) {
+    this.jobName = jobName;
+    this.listener = listener;
+    this.progressUpdateIntervalMs = progressUpdateIntervalMs;
+
+    id = jobId;
+
+    setStatus(Status.READY);
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return ((Job) o).hashCode() == hashCode();
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public void setStatus(Status status) {
+    if (this.status == status) {
+      return;
+    }
+    Status before = this.status;
+    Status after = status;
+    if (listener != null) {
+      listener.beforeStatusChange(this, before, after);
+    }
+    this.status = status;
+    if (listener != null) {
+      listener.afterStatusChange(this, before, after);
+    }
+  }
+
+  public void setListener(JobListener listener) {
+    this.listener = listener;
+  }
+
+  public JobListener getListener() {
+    return listener;
+  }
+
+  public boolean isTerminated() {
+    return !this.status.isReady() && !this.status.isRunning() && !this.status.isPending();
+  }
+
+  public boolean isRunning() {
+    return this.status.isRunning();
+  }
+
+  public void run() {
+    JobProgressPoller progressUpdator = null;
+    try {
+      progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs);
+      progressUpdator.start();
+      dateStarted = new Date();
+      result = jobRun();
+      this.exception = null;
+      errorMessage = null;
+      dateFinished = new Date();
+      progressUpdator.terminate();
+    } catch (NullPointerException e) {
+      logger().error("Job failed", e);
+      progressUpdator.terminate();
+      this.exception = e;
+      result = e.getMessage();
+      errorMessage = getStack(e);
+      dateFinished = new Date();
+    } catch (Throwable e) {
+      logger().error("Job failed", e);
+      progressUpdator.terminate();
+      this.exception = e;
+      result = e.getMessage();
+      errorMessage = getStack(e);
+      dateFinished = new Date();
+    } finally {
+      //aborted = false;
+    }
+  }
+
+  public String getStack(Throwable e) {
+    StackTraceElement[] stacks = e.getStackTrace();
+    if (stacks == null) {
+      return "";
+    }
+    String ss = "";
+    for (StackTraceElement s : stacks) {
+      ss += s.toString() + "\n";
+    }
+
+    return ss;
+  }
+
+  public Throwable getException() {
+    return exception;
+  }
+
+  protected void setException(Throwable t) {
+    exception = t;
+    errorMessage = getStack(t);
+  }
+
+  public Object getReturn() {
+    return result;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  public abstract int progress();
+
+  public abstract Map<String, Object> info();
+
+  protected abstract Object jobRun() throws Throwable;
+
+  protected abstract boolean jobAbort();
+
+  public void abort() {
+    aborted = jobAbort();
+  }
+
+  public boolean isAborted() {
+    return aborted;
+  }
+
+  public Date getDateCreated() {
+    return dateCreated;
+  }
+
+  public Date getDateStarted() {
+    return dateStarted;
+  }
+
+  public Date getDateFinished() {
+    return dateFinished;
+  }
+
+  private Logger logger() {
+    return LoggerFactory.getLogger(Job.class);
+  }
+
+  protected void setResult(Object result) {
+    this.result = result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
new file mode 100644
index 0000000..1ed551f
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public interface JobListener {
+  public void onProgressUpdate(Job job, int progress);
+
+  public void beforeStatusChange(Job job, Job.Status before, Job.Status after);
+
+  public void afterStatusChange(Job job, Job.Status before, Job.Status after);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
new file mode 100644
index 0000000..9de1325
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class JobProgressPoller extends Thread {
+  public static final long DEFAULT_INTERVAL_MSEC = 500;
+  Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
+  private Job job;
+  private long intervalMs;
+  boolean terminate = false;
+
+  public JobProgressPoller(Job job, long intervalMs) {
+    this.job = job;
+    this.intervalMs = intervalMs;
+  }
+
+  @Override
+  public void run() {
+    if (intervalMs < 0) {
+      return;
+    } else if (intervalMs == 0) {
+      intervalMs = DEFAULT_INTERVAL_MSEC;
+    }
+
+    while (terminate == false) {
+      JobListener listener = job.getListener();
+      if (listener != null) {
+        try {
+          if (job.isRunning()) {
+            listener.onProgressUpdate(job, job.progress());
+          }
+        } catch (Exception e) {
+          logger.error("Can not get or update progress", e);
+        }
+      }
+      try {
+        Thread.sleep(intervalMs);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  public void terminate() {
+    terminate = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
new file mode 100644
index 0000000..c8e8e04
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.zeppelin.scheduler.Job.Status;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class ParallelScheduler implements Scheduler {
+  List<Job> queue = new LinkedList<Job>();
+  List<Job> running = new LinkedList<Job>();
+  private ExecutorService executor;
+  private SchedulerListener listener;
+  boolean terminate = false;
+  private String name;
+  private int maxConcurrency;
+
+  public ParallelScheduler(String name, ExecutorService executor, SchedulerListener listener,
+      int maxConcurrency) {
+    this.name = name;
+    this.executor = executor;
+    this.listener = listener;
+    this.maxConcurrency = maxConcurrency;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public Collection<Job> getJobsWaiting() {
+    List<Job> ret = new LinkedList<Job>();
+    synchronized (queue) {
+      for (Job job : queue) {
+        ret.add(job);
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public Collection<Job> getJobsRunning() {
+    List<Job> ret = new LinkedList<Job>();
+    synchronized (queue) {
+      for (Job job : running) {
+        ret.add(job);
+      }
+    }
+    return ret;
+  }
+
+
+
+  @Override
+  public void submit(Job job) {
+    job.setStatus(Status.PENDING);
+    synchronized (queue) {
+      queue.add(job);
+      queue.notify();
+    }
+  }
+
+  @Override
+  public void run() {
+
+    synchronized (queue) {
+      while (terminate == false) {
+        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
+          try {
+            queue.wait(500);
+          } catch (InterruptedException e) {
+          }
+          continue;
+        }
+
+        Job job = queue.remove(0);
+        running.add(job);
+        Scheduler scheduler = this;
+
+        executor.execute(new JobRunner(scheduler, job));
+      }
+
+
+    }
+  }
+
+  public void setMaxConcurrency(int maxConcurrency) {
+    this.maxConcurrency = maxConcurrency;
+    synchronized (queue) {
+      queue.notify();
+    }
+  }
+
+  private class JobRunner implements Runnable {
+    private Scheduler scheduler;
+    private Job job;
+
+    public JobRunner(Scheduler scheduler, Job job) {
+      this.scheduler = scheduler;
+      this.job = job;
+    }
+
+    @Override
+    public void run() {
+      if (job.isAborted()) {
+        job.setStatus(Status.ABORT);
+        job.aborted = false;
+
+        synchronized (queue) {
+          running.remove(job);
+          queue.notify();
+        }
+
+        return;
+      }
+
+      job.setStatus(Status.RUNNING);
+      if (listener != null) {
+        listener.jobStarted(scheduler, job);
+      }
+      job.run();
+      if (job.isAborted()) {
+        job.setStatus(Status.ABORT);
+      } else {
+        if (job.getException() != null) {
+          job.setStatus(Status.ERROR);
+        } else {
+          job.setStatus(Status.FINISHED);
+        }
+      }
+
+      if (listener != null) {
+        listener.jobFinished(scheduler, job);
+      }
+
+      // reset aborted flag to allow retry
+      job.aborted = false;
+      synchronized (queue) {
+        running.remove(job);
+        queue.notify();
+      }
+    }
+  }
+
+
+  @Override
+  public void stop() {
+    terminate = true;
+    synchronized (queue) {
+      queue.notify();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
new file mode 100644
index 0000000..15e4a3c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class RemoteScheduler implements Scheduler {
+  Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
+
+  List<Job> queue = new LinkedList<Job>();
+  List<Job> running = new LinkedList<Job>();
+  private ExecutorService executor;
+  private SchedulerListener listener;
+  boolean terminate = false;
+  private String name;
+  private int maxConcurrency;
+  private RemoteInterpreterProcess interpreterProcess;
+
+  public RemoteScheduler(String name, ExecutorService executor,
+      RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
+      int maxConcurrency) {
+    this.name = name;
+    this.executor = executor;
+    this.listener = listener;
+    this.interpreterProcess = interpreterProcess;
+    this.maxConcurrency = maxConcurrency;
+  }
+
+  @Override
+  public void run() {
+    while (terminate == false) {
+      Job job = null;
+
+      synchronized (queue) {
+        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
+          try {
+            queue.wait(500);
+          } catch (InterruptedException e) {
+          }
+          continue;
+        }
+
+        job = queue.remove(0);
+        running.add(job);
+      }
+
+      // run
+      Scheduler scheduler = this;
+      JobRunner jobRunner = new JobRunner(scheduler, job);
+      executor.execute(jobRunner);
+
+      // wait until it is submitted to the remote
+      while (!jobRunner.isJobSubmittedInRemote()) {
+        synchronized (queue) {
+          try {
+            queue.wait(500);
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public Collection<Job> getJobsWaiting() {
+    List<Job> ret = new LinkedList<Job>();
+    synchronized (queue) {
+      for (Job job : queue) {
+        ret.add(job);
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public Collection<Job> getJobsRunning() {
+    List<Job> ret = new LinkedList<Job>();
+    synchronized (queue) {
+      for (Job job : running) {
+        ret.add(job);
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public void submit(Job job) {
+    job.setStatus(Status.PENDING);
+
+    synchronized (queue) {
+      queue.add(job);
+      queue.notify();
+    }
+  }
+
+  public void setMaxConcurrency(int maxConcurrency) {
+    this.maxConcurrency = maxConcurrency;
+    synchronized (queue) {
+      queue.notify();
+    }
+  }
+
+  /**
+   * Role of the class is get status info from remote process from PENDING to
+   * RUNNING status.
+   */
+  private class JobStatusPoller extends Thread {
+    private long initialPeriodMsec;
+    private long initialPeriodCheckIntervalMsec;
+    private long checkIntervalMsec;
+    private boolean terminate;
+    private JobListener listener;
+    private Job job;
+    Status lastStatus;
+
+    public JobStatusPoller(long initialPeriodMsec,
+        long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
+        JobListener listener) {
+      this.initialPeriodMsec = initialPeriodMsec;
+      this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
+      this.checkIntervalMsec = checkIntervalMsec;
+      this.job = job;
+      this.listener = listener;
+      this.terminate = false;
+    }
+
+    @Override
+    public void run() {
+      long started = System.currentTimeMillis();
+      while (terminate == false) {
+        long current = System.currentTimeMillis();
+        long interval;
+        if (current - started < initialPeriodMsec) {
+          interval = initialPeriodCheckIntervalMsec;
+        } else {
+          interval = checkIntervalMsec;
+        }
+
+        synchronized (this) {
+          try {
+            this.wait(interval);
+          } catch (InterruptedException e) {
+          }
+        }
+
+
+        Status newStatus = getStatus();
+        if (newStatus == null) { // unknown
+          continue;
+        }
+
+        if (newStatus != Status.READY && newStatus != Status.PENDING) {
+          // we don't need more
+          continue;
+        }
+      }
+    }
+
+    public void shutdown() {
+      terminate = true;
+      synchronized (this) {
+        this.notify();
+      }
+    }
+
+
+    private Status getLastStatus() {
+      if (terminate == true) {
+        if (lastStatus != Status.FINISHED &&
+            lastStatus != Status.ERROR &&
+            lastStatus != Status.ABORT) {
+          return Status.FINISHED;
+        } else {
+          return (lastStatus == null) ? Status.FINISHED : lastStatus;
+        }
+      } else {
+        return (lastStatus == null) ? Status.FINISHED : lastStatus;
+      }
+    }
+
+    public synchronized Job.Status getStatus() {
+      if (interpreterProcess.referenceCount() <= 0) {
+        return getLastStatus();
+      }
+
+      Client client;
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e) {
+        logger.error("Can't get status information", e);
+        lastStatus = Status.ERROR;
+        return Status.ERROR;
+      }
+
+      try {
+        String statusStr = client.getStatus(job.getId());
+        if ("Unknown".equals(statusStr)) {
+          // not found this job in the remote schedulers.
+          // maybe not submitted, maybe already finished
+          Status status = getLastStatus();
+          listener.afterStatusChange(job, null, status);
+          return status;
+        }
+        Status status = Status.valueOf(statusStr);
+        lastStatus = status;
+        listener.afterStatusChange(job, null, status);
+        return status;
+      } catch (TException e) {
+        logger.error("Can't get status information", e);
+        lastStatus = Status.ERROR;
+        return Status.ERROR;
+      } catch (Exception e) {
+        logger.error("Unknown status", e);
+        lastStatus = Status.ERROR;
+        return Status.ERROR;
+      } finally {
+        interpreterProcess.releaseClient(client);
+      }
+    }
+  }
+
+  private class JobRunner implements Runnable, JobListener {
+    private Scheduler scheduler;
+    private Job job;
+    private boolean jobExecuted;
+    boolean jobSubmittedRemotely;
+
+    public JobRunner(Scheduler scheduler, Job job) {
+      this.scheduler = scheduler;
+      this.job = job;
+      jobExecuted = false;
+      jobSubmittedRemotely = false;
+    }
+
+    public boolean isJobSubmittedInRemote() {
+      return jobSubmittedRemotely;
+    }
+
+    @Override
+    public void run() {
+      if (job.isAborted()) {
+        job.setStatus(Status.ABORT);
+        job.aborted = false;
+
+        synchronized (queue) {
+          running.remove(job);
+          queue.notify();
+        }
+
+        return;
+      }
+
+      JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
+          job, this);
+      jobStatusPoller.start();
+
+      if (listener != null) {
+        listener.jobStarted(scheduler, job);
+      }
+      job.run();
+      jobExecuted = true;
+      jobSubmittedRemotely = true;
+
+      jobStatusPoller.shutdown();
+      try {
+        jobStatusPoller.join();
+      } catch (InterruptedException e) {
+        logger.error("JobStatusPoller interrupted", e);
+      }
+
+      job.setStatus(jobStatusPoller.getStatus());
+      if (listener != null) {
+        listener.jobFinished(scheduler, job);
+      }
+
+      // reset aborted flag to allow retry
+      job.aborted = false;
+
+      synchronized (queue) {
+        running.remove(job);
+        queue.notify();
+      }
+    }
+
+    @Override
+    public void onProgressUpdate(Job job, int progress) {
+    }
+
+    @Override
+    public void beforeStatusChange(Job job, Status before, Status after) {
+    }
+
+    @Override
+    public void afterStatusChange(Job job, Status before, Status after) {
+      if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished.
+        if (jobExecuted) {
+          jobSubmittedRemotely = true;
+          if (job.isAborted()) {
+            job.setStatus(Status.ABORT);
+          } else if (job.getException() != null) {
+            job.setStatus(Status.ERROR);
+          } else {
+            job.setStatus(Status.FINISHED);
+          }
+        }
+        return;
+      }
+
+
+      // Update remoteStatus
+      if (jobExecuted == false) {
+        if (after == Status.FINISHED || after == Status.ABORT
+            || after == Status.ERROR) {
+          // it can be status of last run.
+          // so not updating the remoteStatus
+          return;
+        } else if (after == Status.RUNNING) {
+          jobSubmittedRemotely = true;
+        }
+      } else {
+        jobSubmittedRemotely = true;
+      }
+
+      // status polled by status poller
+      if (job.getStatus() != after) {
+        job.setStatus(after);
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    terminate = true;
+    synchronized (queue) {
+      queue.notify();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
new file mode 100644
index 0000000..a886c22
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public interface Scheduler extends Runnable {
+  public String getName();
+
+  public Collection<Job> getJobsWaiting();
+
+  public Collection<Job> getJobsRunning();
+
+  public void submit(Job job);
+
+  public void stop();
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
new file mode 100644
index 0000000..2556a81
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class SchedulerFactory implements SchedulerListener {
+  private final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
+  ScheduledExecutorService executor;
+  Map<String, Scheduler> schedulers = new LinkedHashMap<String, Scheduler>();
+
+  private static SchedulerFactory singleton;
+  private static Long singletonLock = new Long(0);
+
+  public static SchedulerFactory singleton() {
+    if (singleton == null) {
+      synchronized (singletonLock) {
+        if (singleton == null) {
+          try {
+            singleton = new SchedulerFactory();
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    }
+    return singleton;
+  }
+
+  public SchedulerFactory() throws Exception {
+    executor = Executors.newScheduledThreadPool(100);
+  }
+
+  public void destroy() {
+    executor.shutdown();
+  }
+
+  public Scheduler createOrGetFIFOScheduler(String name) {
+    synchronized (schedulers) {
+      if (schedulers.containsKey(name) == false) {
+        Scheduler s = new FIFOScheduler(name, executor, this);
+        schedulers.put(name, s);
+        executor.execute(s);
+      }
+      return schedulers.get(name);
+    }
+  }
+
+  public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
+    synchronized (schedulers) {
+      if (schedulers.containsKey(name) == false) {
+        Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency);
+        schedulers.put(name, s);
+        executor.execute(s);
+      }
+      return schedulers.get(name);
+    }
+  }
+
+  public Scheduler createOrGetRemoteScheduler(
+      String name,
+      RemoteInterpreterProcess interpreterProcess,
+      int maxConcurrency) {
+
+    synchronized (schedulers) {
+      if (schedulers.containsKey(name) == false) {
+        Scheduler s = new RemoteScheduler(
+            name,
+            executor,
+            interpreterProcess,
+            this,
+            maxConcurrency);
+        schedulers.put(name, s);
+        executor.execute(s);
+      }
+      return schedulers.get(name);
+    }
+  }
+
+  public Scheduler removeScheduler(String name) {
+    synchronized (schedulers) {
+      Scheduler s = schedulers.remove(name);
+      if (s != null) {
+        s.stop();
+      }
+    }
+    return null;
+  }
+
+  public Collection<Scheduler> listScheduler(String name) {
+    List<Scheduler> s = new LinkedList<Scheduler>();
+    synchronized (schedulers) {
+      for (Scheduler ss : schedulers.values()) {
+        s.add(ss);
+      }
+    }
+    return s;
+  }
+
+  @Override
+  public void jobStarted(Scheduler scheduler, Job job) {
+    logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName());
+
+  }
+
+  @Override
+  public void jobFinished(Scheduler scheduler, Job job) {
+    logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName());
+
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java
new file mode 100644
index 0000000..6fdd176
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public interface SchedulerListener {
+  public void jobStarted(Scheduler scheduler, Job job);
+
+  public void jobFinished(Scheduler scheduler, Job job);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index bbb54b1..051730e 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -1,4 +1,22 @@
-namespace java com.nflabs.zeppelin.interpreter.thrift
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace java org.apache.zeppelin.interpreter.thrift
 
 
 struct RemoteInterpreterContext {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java
deleted file mode 100644
index 091473b..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.nflabs.zeppelin.display;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class InputTest {
-
-	@Before
-	public void setUp() throws Exception {
-	}
-
-	@After
-	public void tearDown() throws Exception {
-	}
-
-	@Test
-	public void testDefaultParamReplace() throws IOException{
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
deleted file mode 100644
index 181b1b0..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-
-import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-
-public class RemoteInterpreterProcessTest {
-
-  @Test
-  public void testStartStop() {
-    RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
-    assertFalse(rip.isRunning());
-    assertEquals(0, rip.referenceCount());
-    assertEquals(1, rip.reference());
-    assertEquals(2, rip.reference());
-    assertEquals(true, rip.isRunning());
-    assertEquals(1, rip.dereference());
-    assertEquals(true, rip.isRunning());
-    assertEquals(0, rip.dereference());
-    assertEquals(false, rip.isRunning());
-  }
-
-  @Test
-  public void testClientFactory() throws Exception {
-    RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
-    rip.reference();
-    assertEquals(0, rip.getNumActiveClient());
-    assertEquals(0, rip.getNumIdleClient());
-
-    Client client = rip.getClient();
-    assertEquals(1, rip.getNumActiveClient());
-    assertEquals(0, rip.getNumIdleClient());
-
-    rip.releaseClient(client);
-    assertEquals(0, rip.getNumActiveClient());
-    assertEquals(1, rip.getNumIdleClient());
-
-    rip.dereference();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
deleted file mode 100644
index 809c76e..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-
-import org.apache.thrift.TException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RemoteInterpreterServerTest {
-  @Before
-  public void setUp() throws Exception {
-  }
-
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  @Test
-  public void testStartStop() throws InterruptedException, IOException, TException {
-    RemoteInterpreterServer server = new RemoteInterpreterServer(
-        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
-    assertEquals(false, server.isRunning());
-
-    server.start();
-    long startTime = System.currentTimeMillis();
-    boolean running = false;
-
-    while (System.currentTimeMillis() - startTime < 10 * 1000) {
-      if (server.isRunning()) {
-        running = true;
-        break;
-      } else {
-        Thread.sleep(200);
-      }
-    }
-
-    assertEquals(true, running);
-    assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort()));
-
-    server.shutdown();
-
-    while (System.currentTimeMillis() - startTime < 10 * 1000) {
-      if (server.isRunning()) {
-        Thread.sleep(200);
-      } else {
-        running = false;
-        break;
-      }
-    }
-    assertEquals(false, running);
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
deleted file mode 100644
index dcee6aa..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ /dev/null
@@ -1,428 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.thrift.transport.TTransportException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.nflabs.zeppelin.display.GUI;
-import com.nflabs.zeppelin.interpreter.InterpreterContext;
-import com.nflabs.zeppelin.interpreter.InterpreterGroup;
-import com.nflabs.zeppelin.interpreter.InterpreterResult;
-import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterB;
-import com.nflabs.zeppelin.scheduler.Job;
-import com.nflabs.zeppelin.scheduler.Job.Status;
-import com.nflabs.zeppelin.scheduler.Scheduler;
-
-public class RemoteInterpreterTest {
-
-
-  private InterpreterGroup intpGroup;
-  private HashMap<String, String> env;
-
-  @Before
-  public void setUp() throws Exception {
-    intpGroup = new InterpreterGroup();
-    env = new HashMap<String, String>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    intpGroup.clone();
-    intpGroup.destroy();
-  }
-
-  @Test
-  public void testRemoteInterperterCall() throws TTransportException, IOException {
-    Properties p = new Properties();
-
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env
-        );
-
-    intpGroup.add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env
-        );
-
-    intpGroup.add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-    process.equals(intpB.getInterpreterProcess());
-
-    assertFalse(process.isRunning());
-    assertEquals(0, process.getNumIdleClient());
-    assertEquals(0, process.referenceCount());
-
-    intpA.open();
-    assertTrue(process.isRunning());
-    assertEquals(1, process.getNumIdleClient());
-    assertEquals(1, process.referenceCount());
-
-    intpA.interpret("1",
-        new InterpreterContext(
-            "id",
-            "title",
-            "text",
-            new HashMap<String, Object>(),
-            new GUI()));
-
-    intpB.open();
-    assertEquals(2, process.referenceCount());
-
-    intpA.close();
-    assertEquals(1, process.referenceCount());
-    intpB.close();
-    assertEquals(0, process.referenceCount());
-
-    assertFalse(process.isRunning());
-
-  }
-
-  @Test
-  public void testRemoteSchedulerSharing() throws TTransportException, IOException {
-    Properties p = new Properties();
-
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env
-        );
-
-    intpGroup.add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env
-        );
-
-    intpGroup.add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    intpB.open();
-
-    long start = System.currentTimeMillis();
-    InterpreterResult ret = intpA.interpret("500",
-        new InterpreterContext(
-            "id",
-            "title",
-            "text",
-            new HashMap<String, Object>(),
-            new GUI()));
-    assertEquals("500", ret.message());
-
-    ret = intpB.interpret("500",
-        new InterpreterContext(
-            "id",
-            "title",
-            "text",
-            new HashMap<String, Object>(),
-            new GUI()));
-    assertEquals("1000", ret.message());
-    long end = System.currentTimeMillis();
-    assertTrue(end - start >= 1000);
-
-
-    intpA.close();
-    intpB.close();
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-    assertFalse(process.isRunning());
-  }
-
-  @Test
-  public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
-    Properties p = new Properties();
-
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env
-        );
-
-    intpGroup.add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    final RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        MockInterpreterB.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env
-        );
-
-    intpGroup.add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    intpB.open();
-
-    long start = System.currentTimeMillis();
-    Job jobA = new Job("jobA", null) {
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        return intpA.interpret("500",
-            new InterpreterContext(
-                "jobA",
-                "title",
-                "text",
-                new HashMap<String, Object>(),
-                new GUI()));
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-    };
-    intpA.getScheduler().submit(jobA);
-
-    Job jobB = new Job("jobB", null) {
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        return intpB.interpret("500",
-            new InterpreterContext(
-                "jobB",
-                "title",
-                "text",
-                new HashMap<String, Object>(),
-                new GUI()));
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-    };
-    intpB.getScheduler().submit(jobB);
-
-    // wait until both job finished
-    while (jobA.getStatus() != Status.FINISHED ||
-           jobB.getStatus() != Status.FINISHED) {
-      Thread.sleep(100);
-    }
-
-    long end = System.currentTimeMillis();
-    assertTrue(end - start >= 1000);
-
-    assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message());
-
-    intpA.close();
-    intpB.close();
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-    assertFalse(process.isRunning());
-  }
-
-  @Test
-  public void testRunOrderPreserved() throws InterruptedException {
-    Properties p = new Properties();
-
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env
-        );
-
-    intpGroup.add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    int concurrency = 3;
-    final List<String> results = new LinkedList<String>();
-
-    Scheduler scheduler = intpA.getScheduler();
-    for (int i = 0; i < concurrency; i++) {
-      final String jobId = Integer.toString(i);
-      scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
-
-        @Override
-        public int progress() {
-          return 0;
-        }
-
-        @Override
-        public Map<String, Object> info() {
-          return null;
-        }
-
-        @Override
-        protected Object jobRun() throws Throwable {
-          InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext(
-              jobId,
-              "title",
-              "text",
-              new HashMap<String, Object>(),
-              new GUI()));
-
-          synchronized (results) {
-            results.add(ret.message());
-            results.notify();
-          }
-          return null;
-        }
-
-        @Override
-        protected boolean jobAbort() {
-          return false;
-        }
-
-      });
-    }
-
-    // wait for job finished
-    synchronized (results) {
-      while (results.size() != concurrency) {
-        results.wait(300);
-      }
-    }
-
-    int i = 0;
-    for (String result : results) {
-      assertEquals(Integer.toString(i++), result);
-    }
-    assertEquals(concurrency, i);
-
-    intpA.close();
-  }
-
-
-  @Test
-  public void testRunParallel() throws InterruptedException {
-    Properties p = new Properties();
-    p.put("parallel", "true");
-
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env
-        );
-
-    intpGroup.add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    int concurrency = 4;
-    final int timeToSleep = 1000;
-    final List<String> results = new LinkedList<String>();
-    long start = System.currentTimeMillis();
-
-    Scheduler scheduler = intpA.getScheduler();
-    for (int i = 0; i < concurrency; i++) {
-      final String jobId = Integer.toString(i);
-      scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
-
-        @Override
-        public int progress() {
-          return 0;
-        }
-
-        @Override
-        public Map<String, Object> info() {
-          return null;
-        }
-
-        @Override
-        protected Object jobRun() throws Throwable {
-          String stmt = Integer.toString(timeToSleep);
-          InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
-              jobId,
-              "title",
-              "text",
-              new HashMap<String, Object>(),
-              new GUI()));
-
-          synchronized (results) {
-            results.add(ret.message());
-            results.notify();
-          }
-          return stmt;
-        }
-
-        @Override
-        protected boolean jobAbort() {
-          return false;
-        }
-
-      });
-    }
-
-    // wait for job finished
-    synchronized (results) {
-      while (results.size() != concurrency) {
-        results.wait(300);
-      }
-    }
-
-    long end = System.currentTimeMillis();
-
-    assertTrue(end - start < timeToSleep * concurrency);
-
-    intpA.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
deleted file mode 100644
index 3035cf2..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.junit.Test;
-
-public class RemoteInterpreterUtilsTest {
-
-  @Test
-  public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
-    assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
deleted file mode 100644
index 1df3979..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import com.nflabs.zeppelin.interpreter.Interpreter;
-import com.nflabs.zeppelin.interpreter.InterpreterContext;
-import com.nflabs.zeppelin.interpreter.InterpreterException;
-import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder;
-import com.nflabs.zeppelin.interpreter.InterpreterResult;
-import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;
-import com.nflabs.zeppelin.scheduler.Scheduler;
-import com.nflabs.zeppelin.scheduler.SchedulerFactory;
-
-public class MockInterpreterA extends Interpreter {
-  static {
-    Interpreter.register(
-        "interpreterA",
-        "group1",
-        MockInterpreterA.class.getName(),
-        new InterpreterPropertyBuilder()
-            .add("p1", "v1", "property1").build());
-
-  }
-
-  private String lastSt;
-
-  public MockInterpreterA(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    //new RuntimeException().printStackTrace();
-  }
-
-  @Override
-  public void close() {
-  }
-
-  public String getLastStatement() {
-    return lastSt;
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    try {
-      Thread.sleep(Long.parseLong(st));
-      this.lastSt = st;
-    } catch (NumberFormatException | InterruptedException e) {
-      throw new InterpreterException(e);
-    }
-    return new InterpreterResult(Code.SUCCESS, st);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<String> completion(String buf, int cursor) {
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    if (getProperty("parallel") != null && getProperty("parallel").equals("true")) {
-      return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10);
-    } else {
-      return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
deleted file mode 100644
index 39f2ab8..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import com.nflabs.zeppelin.interpreter.Interpreter;
-import com.nflabs.zeppelin.interpreter.InterpreterContext;
-import com.nflabs.zeppelin.interpreter.InterpreterException;
-import com.nflabs.zeppelin.interpreter.InterpreterGroup;
-import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder;
-import com.nflabs.zeppelin.interpreter.InterpreterResult;
-import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;
-import com.nflabs.zeppelin.interpreter.WrappedInterpreter;
-import com.nflabs.zeppelin.scheduler.Scheduler;
-
-public class MockInterpreterB extends Interpreter {
-  static {
-    Interpreter.register(
-        "interpreterB",
-        "group1",
-        MockInterpreterA.class.getName(),
-        new InterpreterPropertyBuilder()
-            .add("p1", "v1", "property1").build());
-
-  }
-  public MockInterpreterB(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    //new RuntimeException().printStackTrace();
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    MockInterpreterA intpA = getInterpreterA();
-    String intpASt = intpA.getLastStatement();
-    long timeToSleep = Long.parseLong(st);
-    if (intpASt != null) {
-      timeToSleep += Long.parseLong(intpASt);
-    }
-    try {
-      Thread.sleep(timeToSleep);
-    } catch (NumberFormatException | InterruptedException e) {
-      throw new InterpreterException(e);
-    }
-    return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<String> completion(String buf, int cursor) {
-    return null;
-  }
-
-  public MockInterpreterA getInterpreterA() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-    for (Interpreter intp : interpreterGroup) {
-      if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
-        Interpreter p = intp;
-        while (p instanceof WrappedInterpreter) {
-          p = ((WrappedInterpreter) p).getInnerInterpreter();
-        }
-        return (MockInterpreterA) p;
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-    for (Interpreter intp : interpreterGroup) {
-      if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
-        return intp.getScheduler();
-      }
-    }
-
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
deleted file mode 100644
index 37a29d1..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import junit.framework.TestCase;
-
-import com.nflabs.zeppelin.scheduler.Job.Status;
-
-public class FIFOSchedulerTest extends TestCase {
-
-	private SchedulerFactory schedulerSvc;
-
-	@Override
-  public void setUp() throws Exception{
-		schedulerSvc = new SchedulerFactory();
-	}
-
-	@Override
-  public void tearDown(){
-
-	}
-
-	public void testRun() throws InterruptedException{
-		Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
-		assertEquals(0, s.getJobsRunning().size());
-		assertEquals(0, s.getJobsWaiting().size());
-
-		Job job1 = new SleepingJob("job1", null, 500);
-		Job job2 = new SleepingJob("job2", null, 500);
-
-		s.submit(job1);
-		s.submit(job2);
-		Thread.sleep(200);
-
-		assertEquals(Status.RUNNING, job1.getStatus());
-		assertEquals(Status.PENDING, job2.getStatus());
-		assertEquals(1, s.getJobsRunning().size());
-		assertEquals(1, s.getJobsWaiting().size());
-
-
-		Thread.sleep(500);
-		assertEquals(Status.FINISHED, job1.getStatus());
-		assertEquals(Status.RUNNING, job2.getStatus());
-		assertTrue((500 < (Long)job1.getReturn()));
-		assertEquals(1, s.getJobsRunning().size());
-		assertEquals(0, s.getJobsWaiting().size());
-
-	}
-
-	public void testAbort() throws InterruptedException{
-		Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
-		assertEquals(0, s.getJobsRunning().size());
-		assertEquals(0, s.getJobsWaiting().size());
-
-		Job job1 = new SleepingJob("job1", null, 500);
-		Job job2 = new SleepingJob("job2", null, 500);
-
-		s.submit(job1);
-		s.submit(job2);
-
-		Thread.sleep(200);
-
-		job1.abort();
-		job2.abort();
-
-		Thread.sleep(200);
-
-		assertEquals(Status.ABORT, job1.getStatus());
-		assertEquals(Status.ABORT, job2.getStatus());
-
-		assertTrue((500 > (Long)job1.getReturn()));
-		assertEquals(null, job2.getReturn());
-
-
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java
deleted file mode 100644
index f88de4c..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-
-import com.nflabs.zeppelin.scheduler.Job.Status;
-
-import junit.framework.TestCase;
-public class ParallelSchedulerTest extends TestCase {
-
-	private SchedulerFactory schedulerSvc;
-
-	public void setUp() throws Exception{
-		schedulerSvc = new SchedulerFactory();
-	}
-	
-	public void tearDown(){
-		
-	}
-	
-	public void testRun() throws InterruptedException{
-		Scheduler s = schedulerSvc.createOrGetParallelScheduler("test", 2);
-		assertEquals(0, s.getJobsRunning().size());
-		assertEquals(0, s.getJobsWaiting().size());
-		
-		Job job1 = new SleepingJob("job1", null, 500);
-		Job job2 = new SleepingJob("job2", null, 500);
-		Job job3 = new SleepingJob("job3", null, 500);		
-		
-		s.submit(job1);
-		s.submit(job2);
-		s.submit(job3);
-		Thread.sleep(200);
-
-		assertEquals(Status.RUNNING, job1.getStatus());
-		assertEquals(Status.RUNNING, job2.getStatus());
-		assertEquals(Status.PENDING, job3.getStatus());
-		assertEquals(2, s.getJobsRunning().size());
-		assertEquals(1, s.getJobsWaiting().size());
-		
-		Thread.sleep(500);
-		
-		assertEquals(Status.FINISHED, job1.getStatus());
-		assertEquals(Status.FINISHED, job2.getStatus());
-		assertEquals(Status.RUNNING, job3.getStatus());
-		assertEquals(1, s.getJobsRunning().size());
-		assertEquals(0, s.getJobsWaiting().size());
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
deleted file mode 100644
index 35aa1d3..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.nflabs.zeppelin.display.GUI;
-import com.nflabs.zeppelin.interpreter.InterpreterContext;
-import com.nflabs.zeppelin.interpreter.InterpreterGroup;
-import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreter;
-import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA;
-
-public class RemoteSchedulerTest {
-
-  private SchedulerFactory schedulerSvc;
-
-  @Before
-  public void setUp() throws Exception{
-    schedulerSvc = new SchedulerFactory();
-  }
-
-  @After
-  public void tearDown(){
-
-  }
-
-  @Test
-  public void test() throws Exception {
-    Properties p = new Properties();
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    Map<String, String> env = new HashMap<String, String>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        MockInterpreterA.class.getName(),
-        new File("../bin/interpreter.sh").getAbsolutePath(),
-        "fake",
-        env
-        );
-
-    intpGroup.add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
-        intpA.getInterpreterProcess(),
-        10);
-
-    Job job = new Job("jobId", "jobName", null, 200) {
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        intpA.interpret("1000", new InterpreterContext(
-            "jobId",
-            "title",
-            "text",
-            new HashMap<String, Object>(),
-            new GUI()));
-        return "1000";
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-    };
-    scheduler.submit(job);
-
-    while (job.isRunning() == false) {
-      Thread.sleep(100);
-    }
-
-    Thread.sleep(500);
-    assertEquals(0, scheduler.getJobsWaiting().size());
-    assertEquals(1, scheduler.getJobsRunning().size());
-
-    Thread.sleep(500);
-
-    assertEquals(0, scheduler.getJobsWaiting().size());
-    assertEquals(0, scheduler.getJobsRunning().size());
-
-    intpA.close();
-    schedulerSvc.removeScheduler("test");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java
deleted file mode 100644
index 42d0316..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class SleepingJob extends Job{
-	
-	private int time;
-	boolean abort = false;
-	private long start;
-	private int count;
-	
-	
-	public SleepingJob(String jobName, JobListener listener, int time){
-		super(jobName, listener);
-		this.time = time;
-		count = 0;
-	}
-	public Object jobRun() {
-		start = System.currentTimeMillis();
-		while(abort==false){
-			count++;
-			try {
-				Thread.sleep(10);
-			} catch (InterruptedException e) {
-			}
-			if(System.currentTimeMillis() - start>time) break;
-		}
-		return System.currentTimeMillis()-start;
-	}
-
-	public boolean jobAbort() {
-		abort = true;
-		return true;
-	}
-
-	public int progress() {
-		long p = (System.currentTimeMillis() - start)*100 / time;
-		if(p<0) p = 0;
-		if(p>100) p = 100;
-		return (int) p;
-	}
-
-	public Map<String, Object> info() {
-		Map<String, Object> i = new HashMap<String, Object>();
-		i.put("LoopCount", Integer.toString(count));
-		return i;
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java
new file mode 100644
index 0000000..626ae99
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InputTest {
+
+	@Before
+	public void setUp() throws Exception {
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testDefaultParamReplace() throws IOException{
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
new file mode 100644
index 0000000..02dc224
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.HashMap;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.junit.Test;
+
+public class RemoteInterpreterProcessTest {
+
+  @Test
+  public void testStartStop() {
+    RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
+    assertFalse(rip.isRunning());
+    assertEquals(0, rip.referenceCount());
+    assertEquals(1, rip.reference());
+    assertEquals(2, rip.reference());
+    assertEquals(true, rip.isRunning());
+    assertEquals(1, rip.dereference());
+    assertEquals(true, rip.isRunning());
+    assertEquals(0, rip.dereference());
+    assertEquals(false, rip.isRunning());
+  }
+
+  @Test
+  public void testClientFactory() throws Exception {
+    RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
+    rip.reference();
+    assertEquals(0, rip.getNumActiveClient());
+    assertEquals(0, rip.getNumIdleClient());
+
+    Client client = rip.getClient();
+    assertEquals(1, rip.getNumActiveClient());
+    assertEquals(0, rip.getNumIdleClient());
+
+    rip.releaseClient(client);
+    assertEquals(0, rip.getNumActiveClient());
+    assertEquals(1, rip.getNumIdleClient());
+
+    rip.dereference();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
new file mode 100644
index 0000000..af6b4bd
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteInterpreterServerTest {
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testStartStop() throws InterruptedException, IOException, TException {
+    RemoteInterpreterServer server = new RemoteInterpreterServer(
+        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
+    assertEquals(false, server.isRunning());
+
+    server.start();
+    long startTime = System.currentTimeMillis();
+    boolean running = false;
+
+    while (System.currentTimeMillis() - startTime < 10 * 1000) {
+      if (server.isRunning()) {
+        running = true;
+        break;
+      } else {
+        Thread.sleep(200);
+      }
+    }
+
+    assertEquals(true, running);
+    assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort()));
+
+    server.shutdown();
+
+    while (System.currentTimeMillis() - startTime < 10 * 1000) {
+      if (server.isRunning()) {
+        Thread.sleep(200);
+      } else {
+        running = false;
+        break;
+      }
+    }
+    assertEquals(false, running);
+  }
+
+
+}