You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/04/06 06:05:58 UTC
[07/17] incubator-zeppelin git commit: Rename package/groupId to
org.apache and apply rat plugin.
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
new file mode 100644
index 0000000..e7f950a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.zeppelin.scheduler.Job.Status;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class FIFOScheduler implements Scheduler {
+ List<Job> queue = new LinkedList<Job>();
+ private ExecutorService executor;
+ private SchedulerListener listener;
+ boolean terminate = false;
+ Job runningJob = null;
+ private String name;
+
+ public FIFOScheduler(String name, ExecutorService executor, SchedulerListener listener) {
+ this.name = name;
+ this.executor = executor;
+ this.listener = listener;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public Collection<Job> getJobsWaiting() {
+ List<Job> ret = new LinkedList<Job>();
+ synchronized (queue) {
+ for (Job job : queue) {
+ ret.add(job);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public Collection<Job> getJobsRunning() {
+ List<Job> ret = new LinkedList<Job>();
+ Job job = runningJob;
+
+ if (job != null) {
+ ret.add(job);
+ }
+
+ return ret;
+ }
+
+
+
+ @Override
+ public void submit(Job job) {
+ job.setStatus(Status.PENDING);
+ synchronized (queue) {
+ queue.add(job);
+ queue.notify();
+ }
+ }
+
+ @Override
+ public void run() {
+
+ synchronized (queue) {
+ while (terminate == false) {
+ if (runningJob != null || queue.isEmpty() == true) {
+ try {
+ queue.wait(500);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+
+ runningJob = queue.remove(0);
+
+ final Scheduler scheduler = this;
+ this.executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (runningJob.isAborted()) {
+ runningJob.setStatus(Status.ABORT);
+ runningJob.aborted = false;
+ synchronized (queue) {
+ queue.notify();
+ }
+ return;
+ }
+
+ runningJob.setStatus(Status.RUNNING);
+ if (listener != null) {
+ listener.jobStarted(scheduler, runningJob);
+ }
+ runningJob.run();
+ if (runningJob.isAborted()) {
+ runningJob.setStatus(Status.ABORT);
+ } else {
+ if (runningJob.getException() != null) {
+ runningJob.setStatus(Status.ERROR);
+ } else {
+ runningJob.setStatus(Status.FINISHED);
+ }
+ }
+ if (listener != null) {
+ listener.jobFinished(scheduler, runningJob);
+ }
+ // reset aborted flag to allow retry
+ runningJob.aborted = false;
+ runningJob = null;
+ synchronized (queue) {
+ queue.notify();
+ }
+ }
+ });
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ terminate = true;
+ synchronized (queue) {
+ queue.notify();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
new file mode 100644
index 0000000..9837ad2
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Skeletal implementation of the Job concept.
+ * - designed for inheritance
+ * - should be run on a separate thread
+ * - maintains internal state: it's status
+ * - supports listeners who are updated on status change
+ *
+ * Job class is serialized/deserialized and used server<->client communication
+ * and saving/loading jobs from disk.
+ * Changing/adding/deleting non transitive field name need consideration of that.
+ *
+ * @author Leemoonsoo
+ */
+public abstract class Job {
+ /**
+ * Job status.
+ *
+ * READY - Job is not running, ready to run.
+ * PENDING - Job is submitted to scheduler. but not running yet
+ * RUNNING - Job is running.
+ * FINISHED - Job finished run. with success
+ * ERROR - Job finished run. with error
+ * ABORT - Job finished by abort
+ *
+ */
+ public static enum Status {
+ READY,
+ PENDING,
+ RUNNING,
+ FINISHED,
+ ERROR,
+ ABORT;
+ boolean isReady() {
+ return this == READY;
+ }
+
+ boolean isRunning() {
+ return this == RUNNING;
+ }
+
+ boolean isPending() {
+ return this == PENDING;
+ }
+ }
+
+ private String jobName;
+ String id;
+ Object result;
+ Date dateCreated;
+ Date dateStarted;
+ Date dateFinished;
+ Status status;
+
+ transient boolean aborted = false;
+
+ String errorMessage;
+ private transient Throwable exception;
+ private transient JobListener listener;
+ private long progressUpdateIntervalMs;
+
+ public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) {
+ this.jobName = jobName;
+ this.listener = listener;
+ this.progressUpdateIntervalMs = progressUpdateIntervalMs;
+
+ dateCreated = new Date();
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HHmmss");
+ id = dateFormat.format(dateCreated) + "_" + super.hashCode();
+
+ setStatus(Status.READY);
+ }
+
+ public Job(String jobName, JobListener listener) {
+ this(jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC);
+ }
+
+ public Job(String jobId, String jobName, JobListener listener, long progressUpdateIntervalMs) {
+ this.jobName = jobName;
+ this.listener = listener;
+ this.progressUpdateIntervalMs = progressUpdateIntervalMs;
+
+ id = jobId;
+
+ setStatus(Status.READY);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return ((Job) o).hashCode() == hashCode();
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ if (this.status == status) {
+ return;
+ }
+ Status before = this.status;
+ Status after = status;
+ if (listener != null) {
+ listener.beforeStatusChange(this, before, after);
+ }
+ this.status = status;
+ if (listener != null) {
+ listener.afterStatusChange(this, before, after);
+ }
+ }
+
+ public void setListener(JobListener listener) {
+ this.listener = listener;
+ }
+
+ public JobListener getListener() {
+ return listener;
+ }
+
+ public boolean isTerminated() {
+ return !this.status.isReady() && !this.status.isRunning() && !this.status.isPending();
+ }
+
+ public boolean isRunning() {
+ return this.status.isRunning();
+ }
+
+ public void run() {
+ JobProgressPoller progressUpdator = null;
+ try {
+ progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs);
+ progressUpdator.start();
+ dateStarted = new Date();
+ result = jobRun();
+ this.exception = null;
+ errorMessage = null;
+ dateFinished = new Date();
+ progressUpdator.terminate();
+ } catch (NullPointerException e) {
+ logger().error("Job failed", e);
+ progressUpdator.terminate();
+ this.exception = e;
+ result = e.getMessage();
+ errorMessage = getStack(e);
+ dateFinished = new Date();
+ } catch (Throwable e) {
+ logger().error("Job failed", e);
+ progressUpdator.terminate();
+ this.exception = e;
+ result = e.getMessage();
+ errorMessage = getStack(e);
+ dateFinished = new Date();
+ } finally {
+ //aborted = false;
+ }
+ }
+
+ public String getStack(Throwable e) {
+ StackTraceElement[] stacks = e.getStackTrace();
+ if (stacks == null) {
+ return "";
+ }
+ String ss = "";
+ for (StackTraceElement s : stacks) {
+ ss += s.toString() + "\n";
+ }
+
+ return ss;
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+
+ protected void setException(Throwable t) {
+ exception = t;
+ errorMessage = getStack(t);
+ }
+
+ public Object getReturn() {
+ return result;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public abstract int progress();
+
+ public abstract Map<String, Object> info();
+
+ protected abstract Object jobRun() throws Throwable;
+
+ protected abstract boolean jobAbort();
+
+ public void abort() {
+ aborted = jobAbort();
+ }
+
+ public boolean isAborted() {
+ return aborted;
+ }
+
+ public Date getDateCreated() {
+ return dateCreated;
+ }
+
+ public Date getDateStarted() {
+ return dateStarted;
+ }
+
+ public Date getDateFinished() {
+ return dateFinished;
+ }
+
+ private Logger logger() {
+ return LoggerFactory.getLogger(Job.class);
+ }
+
+ protected void setResult(Object result) {
+ this.result = result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
new file mode 100644
index 0000000..1ed551f
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public interface JobListener {
+ public void onProgressUpdate(Job job, int progress);
+
+ public void beforeStatusChange(Job job, Job.Status before, Job.Status after);
+
+ public void afterStatusChange(Job job, Job.Status before, Job.Status after);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
new file mode 100644
index 0000000..9de1325
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class JobProgressPoller extends Thread {
+ public static final long DEFAULT_INTERVAL_MSEC = 500;
+ Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
+ private Job job;
+ private long intervalMs;
+ boolean terminate = false;
+
+ public JobProgressPoller(Job job, long intervalMs) {
+ this.job = job;
+ this.intervalMs = intervalMs;
+ }
+
+ @Override
+ public void run() {
+ if (intervalMs < 0) {
+ return;
+ } else if (intervalMs == 0) {
+ intervalMs = DEFAULT_INTERVAL_MSEC;
+ }
+
+ while (terminate == false) {
+ JobListener listener = job.getListener();
+ if (listener != null) {
+ try {
+ if (job.isRunning()) {
+ listener.onProgressUpdate(job, job.progress());
+ }
+ } catch (Exception e) {
+ logger.error("Can not get or update progress", e);
+ }
+ }
+ try {
+ Thread.sleep(intervalMs);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ public void terminate() {
+ terminate = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
new file mode 100644
index 0000000..c8e8e04
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.zeppelin.scheduler.Job.Status;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class ParallelScheduler implements Scheduler {
+ List<Job> queue = new LinkedList<Job>();
+ List<Job> running = new LinkedList<Job>();
+ private ExecutorService executor;
+ private SchedulerListener listener;
+ boolean terminate = false;
+ private String name;
+ private int maxConcurrency;
+
+ public ParallelScheduler(String name, ExecutorService executor, SchedulerListener listener,
+ int maxConcurrency) {
+ this.name = name;
+ this.executor = executor;
+ this.listener = listener;
+ this.maxConcurrency = maxConcurrency;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public Collection<Job> getJobsWaiting() {
+ List<Job> ret = new LinkedList<Job>();
+ synchronized (queue) {
+ for (Job job : queue) {
+ ret.add(job);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public Collection<Job> getJobsRunning() {
+ List<Job> ret = new LinkedList<Job>();
+ synchronized (queue) {
+ for (Job job : running) {
+ ret.add(job);
+ }
+ }
+ return ret;
+ }
+
+
+
+ @Override
+ public void submit(Job job) {
+ job.setStatus(Status.PENDING);
+ synchronized (queue) {
+ queue.add(job);
+ queue.notify();
+ }
+ }
+
+ @Override
+ public void run() {
+
+ synchronized (queue) {
+ while (terminate == false) {
+ if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
+ try {
+ queue.wait(500);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+
+ Job job = queue.remove(0);
+ running.add(job);
+ Scheduler scheduler = this;
+
+ executor.execute(new JobRunner(scheduler, job));
+ }
+
+
+ }
+ }
+
+ public void setMaxConcurrency(int maxConcurrency) {
+ this.maxConcurrency = maxConcurrency;
+ synchronized (queue) {
+ queue.notify();
+ }
+ }
+
+ private class JobRunner implements Runnable {
+ private Scheduler scheduler;
+ private Job job;
+
+ public JobRunner(Scheduler scheduler, Job job) {
+ this.scheduler = scheduler;
+ this.job = job;
+ }
+
+ @Override
+ public void run() {
+ if (job.isAborted()) {
+ job.setStatus(Status.ABORT);
+ job.aborted = false;
+
+ synchronized (queue) {
+ running.remove(job);
+ queue.notify();
+ }
+
+ return;
+ }
+
+ job.setStatus(Status.RUNNING);
+ if (listener != null) {
+ listener.jobStarted(scheduler, job);
+ }
+ job.run();
+ if (job.isAborted()) {
+ job.setStatus(Status.ABORT);
+ } else {
+ if (job.getException() != null) {
+ job.setStatus(Status.ERROR);
+ } else {
+ job.setStatus(Status.FINISHED);
+ }
+ }
+
+ if (listener != null) {
+ listener.jobFinished(scheduler, job);
+ }
+
+ // reset aborted flag to allow retry
+ job.aborted = false;
+ synchronized (queue) {
+ running.remove(job);
+ queue.notify();
+ }
+ }
+ }
+
+
+ @Override
+ public void stop() {
+ terminate = true;
+ synchronized (queue) {
+ queue.notify();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
new file mode 100644
index 0000000..15e4a3c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class RemoteScheduler implements Scheduler {
+ Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
+
+ List<Job> queue = new LinkedList<Job>();
+ List<Job> running = new LinkedList<Job>();
+ private ExecutorService executor;
+ private SchedulerListener listener;
+ boolean terminate = false;
+ private String name;
+ private int maxConcurrency;
+ private RemoteInterpreterProcess interpreterProcess;
+
+ public RemoteScheduler(String name, ExecutorService executor,
+ RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
+ int maxConcurrency) {
+ this.name = name;
+ this.executor = executor;
+ this.listener = listener;
+ this.interpreterProcess = interpreterProcess;
+ this.maxConcurrency = maxConcurrency;
+ }
+
+ @Override
+ public void run() {
+ while (terminate == false) {
+ Job job = null;
+
+ synchronized (queue) {
+ if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
+ try {
+ queue.wait(500);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+
+ job = queue.remove(0);
+ running.add(job);
+ }
+
+ // run
+ Scheduler scheduler = this;
+ JobRunner jobRunner = new JobRunner(scheduler, job);
+ executor.execute(jobRunner);
+
+ // wait until it is submitted to the remote
+ while (!jobRunner.isJobSubmittedInRemote()) {
+ synchronized (queue) {
+ try {
+ queue.wait(500);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public Collection<Job> getJobsWaiting() {
+ List<Job> ret = new LinkedList<Job>();
+ synchronized (queue) {
+ for (Job job : queue) {
+ ret.add(job);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public Collection<Job> getJobsRunning() {
+ List<Job> ret = new LinkedList<Job>();
+ synchronized (queue) {
+ for (Job job : running) {
+ ret.add(job);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void submit(Job job) {
+ job.setStatus(Status.PENDING);
+
+ synchronized (queue) {
+ queue.add(job);
+ queue.notify();
+ }
+ }
+
+ public void setMaxConcurrency(int maxConcurrency) {
+ this.maxConcurrency = maxConcurrency;
+ synchronized (queue) {
+ queue.notify();
+ }
+ }
+
+ /**
+ * Role of the class is get status info from remote process from PENDING to
+ * RUNNING status.
+ */
+ private class JobStatusPoller extends Thread {
+ private long initialPeriodMsec;
+ private long initialPeriodCheckIntervalMsec;
+ private long checkIntervalMsec;
+ private boolean terminate;
+ private JobListener listener;
+ private Job job;
+ Status lastStatus;
+
+ public JobStatusPoller(long initialPeriodMsec,
+ long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
+ JobListener listener) {
+ this.initialPeriodMsec = initialPeriodMsec;
+ this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
+ this.checkIntervalMsec = checkIntervalMsec;
+ this.job = job;
+ this.listener = listener;
+ this.terminate = false;
+ }
+
+ @Override
+ public void run() {
+ long started = System.currentTimeMillis();
+ while (terminate == false) {
+ long current = System.currentTimeMillis();
+ long interval;
+ if (current - started < initialPeriodMsec) {
+ interval = initialPeriodCheckIntervalMsec;
+ } else {
+ interval = checkIntervalMsec;
+ }
+
+ synchronized (this) {
+ try {
+ this.wait(interval);
+ } catch (InterruptedException e) {
+ }
+ }
+
+
+ Status newStatus = getStatus();
+ if (newStatus == null) { // unknown
+ continue;
+ }
+
+ if (newStatus != Status.READY && newStatus != Status.PENDING) {
+ // we don't need more
+ continue;
+ }
+ }
+ }
+
+ public void shutdown() {
+ terminate = true;
+ synchronized (this) {
+ this.notify();
+ }
+ }
+
+
+ private Status getLastStatus() {
+ if (terminate == true) {
+ if (lastStatus != Status.FINISHED &&
+ lastStatus != Status.ERROR &&
+ lastStatus != Status.ABORT) {
+ return Status.FINISHED;
+ } else {
+ return (lastStatus == null) ? Status.FINISHED : lastStatus;
+ }
+ } else {
+ return (lastStatus == null) ? Status.FINISHED : lastStatus;
+ }
+ }
+
+ public synchronized Job.Status getStatus() {
+ if (interpreterProcess.referenceCount() <= 0) {
+ return getLastStatus();
+ }
+
+ Client client;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e) {
+ logger.error("Can't get status information", e);
+ lastStatus = Status.ERROR;
+ return Status.ERROR;
+ }
+
+ try {
+ String statusStr = client.getStatus(job.getId());
+ if ("Unknown".equals(statusStr)) {
+ // not found this job in the remote schedulers.
+ // maybe not submitted, maybe already finished
+ Status status = getLastStatus();
+ listener.afterStatusChange(job, null, status);
+ return status;
+ }
+ Status status = Status.valueOf(statusStr);
+ lastStatus = status;
+ listener.afterStatusChange(job, null, status);
+ return status;
+ } catch (TException e) {
+ logger.error("Can't get status information", e);
+ lastStatus = Status.ERROR;
+ return Status.ERROR;
+ } catch (Exception e) {
+ logger.error("Unknown status", e);
+ lastStatus = Status.ERROR;
+ return Status.ERROR;
+ } finally {
+ interpreterProcess.releaseClient(client);
+ }
+ }
+ }
+
+ private class JobRunner implements Runnable, JobListener {
+ private Scheduler scheduler;
+ private Job job;
+ private boolean jobExecuted;
+ boolean jobSubmittedRemotely;
+
+ public JobRunner(Scheduler scheduler, Job job) {
+ this.scheduler = scheduler;
+ this.job = job;
+ jobExecuted = false;
+ jobSubmittedRemotely = false;
+ }
+
+ public boolean isJobSubmittedInRemote() {
+ return jobSubmittedRemotely;
+ }
+
+ @Override
+ public void run() {
+ if (job.isAborted()) {
+ job.setStatus(Status.ABORT);
+ job.aborted = false;
+
+ synchronized (queue) {
+ running.remove(job);
+ queue.notify();
+ }
+
+ return;
+ }
+
+ JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
+ job, this);
+ jobStatusPoller.start();
+
+ if (listener != null) {
+ listener.jobStarted(scheduler, job);
+ }
+ job.run();
+ jobExecuted = true;
+ jobSubmittedRemotely = true;
+
+ jobStatusPoller.shutdown();
+ try {
+ jobStatusPoller.join();
+ } catch (InterruptedException e) {
+ logger.error("JobStatusPoller interrupted", e);
+ }
+
+ job.setStatus(jobStatusPoller.getStatus());
+ if (listener != null) {
+ listener.jobFinished(scheduler, job);
+ }
+
+ // reset aborted flag to allow retry
+ job.aborted = false;
+
+ synchronized (queue) {
+ running.remove(job);
+ queue.notify();
+ }
+ }
+
+ @Override
+ public void onProgressUpdate(Job job, int progress) {
+ }
+
+ @Override
+ public void beforeStatusChange(Job job, Status before, Status after) {
+ }
+
+ @Override
+ public void afterStatusChange(Job job, Status before, Status after) {
+ if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished.
+ if (jobExecuted) {
+ jobSubmittedRemotely = true;
+ if (job.isAborted()) {
+ job.setStatus(Status.ABORT);
+ } else if (job.getException() != null) {
+ job.setStatus(Status.ERROR);
+ } else {
+ job.setStatus(Status.FINISHED);
+ }
+ }
+ return;
+ }
+
+
+ // Update remoteStatus
+ if (jobExecuted == false) {
+ if (after == Status.FINISHED || after == Status.ABORT
+ || after == Status.ERROR) {
+ // it can be status of last run.
+ // so not updating the remoteStatus
+ return;
+ } else if (after == Status.RUNNING) {
+ jobSubmittedRemotely = true;
+ }
+ } else {
+ jobSubmittedRemotely = true;
+ }
+
+ // status polled by status poller
+ if (job.getStatus() != after) {
+ job.setStatus(after);
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ terminate = true;
+ synchronized (queue) {
+ queue.notify();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
new file mode 100644
index 0000000..a886c22
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public interface Scheduler extends Runnable {
+ public String getName();
+
+ public Collection<Job> getJobsWaiting();
+
+ public Collection<Job> getJobsRunning();
+
+ public void submit(Job job);
+
+ public void stop();
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
new file mode 100644
index 0000000..2556a81
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class SchedulerFactory implements SchedulerListener {
+ private final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
+ ScheduledExecutorService executor;
+ Map<String, Scheduler> schedulers = new LinkedHashMap<String, Scheduler>();
+
+ private static SchedulerFactory singleton;
+ private static Long singletonLock = new Long(0);
+
+ public static SchedulerFactory singleton() {
+ if (singleton == null) {
+ synchronized (singletonLock) {
+ if (singleton == null) {
+ try {
+ singleton = new SchedulerFactory();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ return singleton;
+ }
+
+ public SchedulerFactory() throws Exception {
+ executor = Executors.newScheduledThreadPool(100);
+ }
+
+ public void destroy() {
+ executor.shutdown();
+ }
+
+ public Scheduler createOrGetFIFOScheduler(String name) {
+ synchronized (schedulers) {
+ if (schedulers.containsKey(name) == false) {
+ Scheduler s = new FIFOScheduler(name, executor, this);
+ schedulers.put(name, s);
+ executor.execute(s);
+ }
+ return schedulers.get(name);
+ }
+ }
+
+ public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
+ synchronized (schedulers) {
+ if (schedulers.containsKey(name) == false) {
+ Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency);
+ schedulers.put(name, s);
+ executor.execute(s);
+ }
+ return schedulers.get(name);
+ }
+ }
+
+ public Scheduler createOrGetRemoteScheduler(
+ String name,
+ RemoteInterpreterProcess interpreterProcess,
+ int maxConcurrency) {
+
+ synchronized (schedulers) {
+ if (schedulers.containsKey(name) == false) {
+ Scheduler s = new RemoteScheduler(
+ name,
+ executor,
+ interpreterProcess,
+ this,
+ maxConcurrency);
+ schedulers.put(name, s);
+ executor.execute(s);
+ }
+ return schedulers.get(name);
+ }
+ }
+
+ public Scheduler removeScheduler(String name) {
+ synchronized (schedulers) {
+ Scheduler s = schedulers.remove(name);
+ if (s != null) {
+ s.stop();
+ }
+ }
+ return null;
+ }
+
+ public Collection<Scheduler> listScheduler(String name) {
+ List<Scheduler> s = new LinkedList<Scheduler>();
+ synchronized (schedulers) {
+ for (Scheduler ss : schedulers.values()) {
+ s.add(ss);
+ }
+ }
+ return s;
+ }
+
+ @Override
+ public void jobStarted(Scheduler scheduler, Job job) {
+ logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName());
+
+ }
+
+ @Override
+ public void jobFinished(Scheduler scheduler, Job job) {
+ logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName());
+
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java
new file mode 100644
index 0000000..6fdd176
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+/**
+ * TODO(moon) : add description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public interface SchedulerListener {
+ public void jobStarted(Scheduler scheduler, Job job);
+
+ public void jobFinished(Scheduler scheduler, Job job);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index bbb54b1..051730e 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -1,4 +1,22 @@
-namespace java com.nflabs.zeppelin.interpreter.thrift
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace java org.apache.zeppelin.interpreter.thrift
struct RemoteInterpreterContext {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java
deleted file mode 100644
index 091473b..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/display/InputTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.nflabs.zeppelin.display;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class InputTest {
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public void testDefaultParamReplace() throws IOException{
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
deleted file mode 100644
index 181b1b0..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-
-import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-
-public class RemoteInterpreterProcessTest {
-
- @Test
- public void testStartStop() {
- RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
- assertFalse(rip.isRunning());
- assertEquals(0, rip.referenceCount());
- assertEquals(1, rip.reference());
- assertEquals(2, rip.reference());
- assertEquals(true, rip.isRunning());
- assertEquals(1, rip.dereference());
- assertEquals(true, rip.isRunning());
- assertEquals(0, rip.dereference());
- assertEquals(false, rip.isRunning());
- }
-
- @Test
- public void testClientFactory() throws Exception {
- RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
- rip.reference();
- assertEquals(0, rip.getNumActiveClient());
- assertEquals(0, rip.getNumIdleClient());
-
- Client client = rip.getClient();
- assertEquals(1, rip.getNumActiveClient());
- assertEquals(0, rip.getNumIdleClient());
-
- rip.releaseClient(client);
- assertEquals(0, rip.getNumActiveClient());
- assertEquals(1, rip.getNumIdleClient());
-
- rip.dereference();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
deleted file mode 100644
index 809c76e..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-
-import org.apache.thrift.TException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RemoteInterpreterServerTest {
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public void testStartStop() throws InterruptedException, IOException, TException {
- RemoteInterpreterServer server = new RemoteInterpreterServer(
- RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
- assertEquals(false, server.isRunning());
-
- server.start();
- long startTime = System.currentTimeMillis();
- boolean running = false;
-
- while (System.currentTimeMillis() - startTime < 10 * 1000) {
- if (server.isRunning()) {
- running = true;
- break;
- } else {
- Thread.sleep(200);
- }
- }
-
- assertEquals(true, running);
- assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort()));
-
- server.shutdown();
-
- while (System.currentTimeMillis() - startTime < 10 * 1000) {
- if (server.isRunning()) {
- Thread.sleep(200);
- } else {
- running = false;
- break;
- }
- }
- assertEquals(false, running);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
deleted file mode 100644
index dcee6aa..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ /dev/null
@@ -1,428 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.thrift.transport.TTransportException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.nflabs.zeppelin.display.GUI;
-import com.nflabs.zeppelin.interpreter.InterpreterContext;
-import com.nflabs.zeppelin.interpreter.InterpreterGroup;
-import com.nflabs.zeppelin.interpreter.InterpreterResult;
-import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterB;
-import com.nflabs.zeppelin.scheduler.Job;
-import com.nflabs.zeppelin.scheduler.Job.Status;
-import com.nflabs.zeppelin.scheduler.Scheduler;
-
-public class RemoteInterpreterTest {
-
-
- private InterpreterGroup intpGroup;
- private HashMap<String, String> env;
-
- @Before
- public void setUp() throws Exception {
- intpGroup = new InterpreterGroup();
- env = new HashMap<String, String>();
- env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
- }
-
- @After
- public void tearDown() throws Exception {
- intpGroup.clone();
- intpGroup.destroy();
- }
-
- @Test
- public void testRemoteInterperterCall() throws TTransportException, IOException {
- Properties p = new Properties();
-
- RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env
- );
-
- intpGroup.add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- RemoteInterpreter intpB = new RemoteInterpreter(
- p,
- MockInterpreterB.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env
- );
-
- intpGroup.add(intpB);
- intpB.setInterpreterGroup(intpGroup);
-
-
- RemoteInterpreterProcess process = intpA.getInterpreterProcess();
- process.equals(intpB.getInterpreterProcess());
-
- assertFalse(process.isRunning());
- assertEquals(0, process.getNumIdleClient());
- assertEquals(0, process.referenceCount());
-
- intpA.open();
- assertTrue(process.isRunning());
- assertEquals(1, process.getNumIdleClient());
- assertEquals(1, process.referenceCount());
-
- intpA.interpret("1",
- new InterpreterContext(
- "id",
- "title",
- "text",
- new HashMap<String, Object>(),
- new GUI()));
-
- intpB.open();
- assertEquals(2, process.referenceCount());
-
- intpA.close();
- assertEquals(1, process.referenceCount());
- intpB.close();
- assertEquals(0, process.referenceCount());
-
- assertFalse(process.isRunning());
-
- }
-
- @Test
- public void testRemoteSchedulerSharing() throws TTransportException, IOException {
- Properties p = new Properties();
-
- RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env
- );
-
- intpGroup.add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- RemoteInterpreter intpB = new RemoteInterpreter(
- p,
- MockInterpreterB.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env
- );
-
- intpGroup.add(intpB);
- intpB.setInterpreterGroup(intpGroup);
-
- intpA.open();
- intpB.open();
-
- long start = System.currentTimeMillis();
- InterpreterResult ret = intpA.interpret("500",
- new InterpreterContext(
- "id",
- "title",
- "text",
- new HashMap<String, Object>(),
- new GUI()));
- assertEquals("500", ret.message());
-
- ret = intpB.interpret("500",
- new InterpreterContext(
- "id",
- "title",
- "text",
- new HashMap<String, Object>(),
- new GUI()));
- assertEquals("1000", ret.message());
- long end = System.currentTimeMillis();
- assertTrue(end - start >= 1000);
-
-
- intpA.close();
- intpB.close();
-
- RemoteInterpreterProcess process = intpA.getInterpreterProcess();
- assertFalse(process.isRunning());
- }
-
- @Test
- public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException {
- Properties p = new Properties();
-
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env
- );
-
- intpGroup.add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- final RemoteInterpreter intpB = new RemoteInterpreter(
- p,
- MockInterpreterB.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env
- );
-
- intpGroup.add(intpB);
- intpB.setInterpreterGroup(intpGroup);
-
- intpA.open();
- intpB.open();
-
- long start = System.currentTimeMillis();
- Job jobA = new Job("jobA", null) {
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- return intpA.interpret("500",
- new InterpreterContext(
- "jobA",
- "title",
- "text",
- new HashMap<String, Object>(),
- new GUI()));
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- };
- intpA.getScheduler().submit(jobA);
-
- Job jobB = new Job("jobB", null) {
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- return intpB.interpret("500",
- new InterpreterContext(
- "jobB",
- "title",
- "text",
- new HashMap<String, Object>(),
- new GUI()));
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- };
- intpB.getScheduler().submit(jobB);
-
- // wait until both job finished
- while (jobA.getStatus() != Status.FINISHED ||
- jobB.getStatus() != Status.FINISHED) {
- Thread.sleep(100);
- }
-
- long end = System.currentTimeMillis();
- assertTrue(end - start >= 1000);
-
- assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message());
-
- intpA.close();
- intpB.close();
-
- RemoteInterpreterProcess process = intpA.getInterpreterProcess();
- assertFalse(process.isRunning());
- }
-
- @Test
- public void testRunOrderPreserved() throws InterruptedException {
- Properties p = new Properties();
-
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env
- );
-
- intpGroup.add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- intpA.open();
-
- int concurrency = 3;
- final List<String> results = new LinkedList<String>();
-
- Scheduler scheduler = intpA.getScheduler();
- for (int i = 0; i < concurrency; i++) {
- final String jobId = Integer.toString(i);
- scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext(
- jobId,
- "title",
- "text",
- new HashMap<String, Object>(),
- new GUI()));
-
- synchronized (results) {
- results.add(ret.message());
- results.notify();
- }
- return null;
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- });
- }
-
- // wait for job finished
- synchronized (results) {
- while (results.size() != concurrency) {
- results.wait(300);
- }
- }
-
- int i = 0;
- for (String result : results) {
- assertEquals(Integer.toString(i++), result);
- }
- assertEquals(concurrency, i);
-
- intpA.close();
- }
-
-
- @Test
- public void testRunParallel() throws InterruptedException {
- Properties p = new Properties();
- p.put("parallel", "true");
-
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env
- );
-
- intpGroup.add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- intpA.open();
-
- int concurrency = 4;
- final int timeToSleep = 1000;
- final List<String> results = new LinkedList<String>();
- long start = System.currentTimeMillis();
-
- Scheduler scheduler = intpA.getScheduler();
- for (int i = 0; i < concurrency; i++) {
- final String jobId = Integer.toString(i);
- scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- String stmt = Integer.toString(timeToSleep);
- InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
- jobId,
- "title",
- "text",
- new HashMap<String, Object>(),
- new GUI()));
-
- synchronized (results) {
- results.add(ret.message());
- results.notify();
- }
- return stmt;
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
-
- });
- }
-
- // wait for job finished
- synchronized (results) {
- while (results.size() != concurrency) {
- results.wait(300);
- }
- }
-
- long end = System.currentTimeMillis();
-
- assertTrue(end - start < timeToSleep * concurrency);
-
- intpA.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
deleted file mode 100644
index 3035cf2..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.junit.Test;
-
-public class RemoteInterpreterUtilsTest {
-
- @Test
- public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
- assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
deleted file mode 100644
index 1df3979..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import com.nflabs.zeppelin.interpreter.Interpreter;
-import com.nflabs.zeppelin.interpreter.InterpreterContext;
-import com.nflabs.zeppelin.interpreter.InterpreterException;
-import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder;
-import com.nflabs.zeppelin.interpreter.InterpreterResult;
-import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;
-import com.nflabs.zeppelin.scheduler.Scheduler;
-import com.nflabs.zeppelin.scheduler.SchedulerFactory;
-
-public class MockInterpreterA extends Interpreter {
- static {
- Interpreter.register(
- "interpreterA",
- "group1",
- MockInterpreterA.class.getName(),
- new InterpreterPropertyBuilder()
- .add("p1", "v1", "property1").build());
-
- }
-
- private String lastSt;
-
- public MockInterpreterA(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- //new RuntimeException().printStackTrace();
- }
-
- @Override
- public void close() {
- }
-
- public String getLastStatement() {
- return lastSt;
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- try {
- Thread.sleep(Long.parseLong(st));
- this.lastSt = st;
- } catch (NumberFormatException | InterruptedException e) {
- throw new InterpreterException(e);
- }
- return new InterpreterResult(Code.SUCCESS, st);
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<String> completion(String buf, int cursor) {
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- if (getProperty("parallel") != null && getProperty("parallel").equals("true")) {
- return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10);
- } else {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
deleted file mode 100644
index 39f2ab8..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.nflabs.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import com.nflabs.zeppelin.interpreter.Interpreter;
-import com.nflabs.zeppelin.interpreter.InterpreterContext;
-import com.nflabs.zeppelin.interpreter.InterpreterException;
-import com.nflabs.zeppelin.interpreter.InterpreterGroup;
-import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder;
-import com.nflabs.zeppelin.interpreter.InterpreterResult;
-import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;
-import com.nflabs.zeppelin.interpreter.WrappedInterpreter;
-import com.nflabs.zeppelin.scheduler.Scheduler;
-
-public class MockInterpreterB extends Interpreter {
- static {
- Interpreter.register(
- "interpreterB",
- "group1",
- MockInterpreterA.class.getName(),
- new InterpreterPropertyBuilder()
- .add("p1", "v1", "property1").build());
-
- }
- public MockInterpreterB(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- //new RuntimeException().printStackTrace();
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- MockInterpreterA intpA = getInterpreterA();
- String intpASt = intpA.getLastStatement();
- long timeToSleep = Long.parseLong(st);
- if (intpASt != null) {
- timeToSleep += Long.parseLong(intpASt);
- }
- try {
- Thread.sleep(timeToSleep);
- } catch (NumberFormatException | InterruptedException e) {
- throw new InterpreterException(e);
- }
- return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
- }
-
- @Override
- public void cancel(InterpreterContext context) {
-
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<String> completion(String buf, int cursor) {
- return null;
- }
-
- public MockInterpreterA getInterpreterA() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
- for (Interpreter intp : interpreterGroup) {
- if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- return (MockInterpreterA) p;
- }
- }
- return null;
- }
-
- @Override
- public Scheduler getScheduler() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
- for (Interpreter intp : interpreterGroup) {
- if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
- return intp.getScheduler();
- }
- }
-
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
deleted file mode 100644
index 37a29d1..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import junit.framework.TestCase;
-
-import com.nflabs.zeppelin.scheduler.Job.Status;
-
-public class FIFOSchedulerTest extends TestCase {
-
- private SchedulerFactory schedulerSvc;
-
- @Override
- public void setUp() throws Exception{
- schedulerSvc = new SchedulerFactory();
- }
-
- @Override
- public void tearDown(){
-
- }
-
- public void testRun() throws InterruptedException{
- Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
- assertEquals(0, s.getJobsRunning().size());
- assertEquals(0, s.getJobsWaiting().size());
-
- Job job1 = new SleepingJob("job1", null, 500);
- Job job2 = new SleepingJob("job2", null, 500);
-
- s.submit(job1);
- s.submit(job2);
- Thread.sleep(200);
-
- assertEquals(Status.RUNNING, job1.getStatus());
- assertEquals(Status.PENDING, job2.getStatus());
- assertEquals(1, s.getJobsRunning().size());
- assertEquals(1, s.getJobsWaiting().size());
-
-
- Thread.sleep(500);
- assertEquals(Status.FINISHED, job1.getStatus());
- assertEquals(Status.RUNNING, job2.getStatus());
- assertTrue((500 < (Long)job1.getReturn()));
- assertEquals(1, s.getJobsRunning().size());
- assertEquals(0, s.getJobsWaiting().size());
-
- }
-
- public void testAbort() throws InterruptedException{
- Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
- assertEquals(0, s.getJobsRunning().size());
- assertEquals(0, s.getJobsWaiting().size());
-
- Job job1 = new SleepingJob("job1", null, 500);
- Job job2 = new SleepingJob("job2", null, 500);
-
- s.submit(job1);
- s.submit(job2);
-
- Thread.sleep(200);
-
- job1.abort();
- job2.abort();
-
- Thread.sleep(200);
-
- assertEquals(Status.ABORT, job1.getStatus());
- assertEquals(Status.ABORT, job2.getStatus());
-
- assertTrue((500 > (Long)job1.getReturn()));
- assertEquals(null, job2.getReturn());
-
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java
deleted file mode 100644
index f88de4c..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/ParallelSchedulerTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-
-import com.nflabs.zeppelin.scheduler.Job.Status;
-
-import junit.framework.TestCase;
-public class ParallelSchedulerTest extends TestCase {
-
- private SchedulerFactory schedulerSvc;
-
- public void setUp() throws Exception{
- schedulerSvc = new SchedulerFactory();
- }
-
- public void tearDown(){
-
- }
-
- public void testRun() throws InterruptedException{
- Scheduler s = schedulerSvc.createOrGetParallelScheduler("test", 2);
- assertEquals(0, s.getJobsRunning().size());
- assertEquals(0, s.getJobsWaiting().size());
-
- Job job1 = new SleepingJob("job1", null, 500);
- Job job2 = new SleepingJob("job2", null, 500);
- Job job3 = new SleepingJob("job3", null, 500);
-
- s.submit(job1);
- s.submit(job2);
- s.submit(job3);
- Thread.sleep(200);
-
- assertEquals(Status.RUNNING, job1.getStatus());
- assertEquals(Status.RUNNING, job2.getStatus());
- assertEquals(Status.PENDING, job3.getStatus());
- assertEquals(2, s.getJobsRunning().size());
- assertEquals(1, s.getJobsWaiting().size());
-
- Thread.sleep(500);
-
- assertEquals(Status.FINISHED, job1.getStatus());
- assertEquals(Status.FINISHED, job2.getStatus());
- assertEquals(Status.RUNNING, job3.getStatus());
- assertEquals(1, s.getJobsRunning().size());
- assertEquals(0, s.getJobsWaiting().size());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
deleted file mode 100644
index 35aa1d3..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.nflabs.zeppelin.display.GUI;
-import com.nflabs.zeppelin.interpreter.InterpreterContext;
-import com.nflabs.zeppelin.interpreter.InterpreterGroup;
-import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreter;
-import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA;
-
-public class RemoteSchedulerTest {
-
- private SchedulerFactory schedulerSvc;
-
- @Before
- public void setUp() throws Exception{
- schedulerSvc = new SchedulerFactory();
- }
-
- @After
- public void tearDown(){
-
- }
-
- @Test
- public void test() throws Exception {
- Properties p = new Properties();
- InterpreterGroup intpGroup = new InterpreterGroup();
- Map<String, String> env = new HashMap<String, String>();
- env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
- final RemoteInterpreter intpA = new RemoteInterpreter(
- p,
- MockInterpreterA.class.getName(),
- new File("../bin/interpreter.sh").getAbsolutePath(),
- "fake",
- env
- );
-
- intpGroup.add(intpA);
- intpA.setInterpreterGroup(intpGroup);
-
- intpA.open();
-
- Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
- intpA.getInterpreterProcess(),
- 10);
-
- Job job = new Job("jobId", "jobName", null, 200) {
-
- @Override
- public int progress() {
- return 0;
- }
-
- @Override
- public Map<String, Object> info() {
- return null;
- }
-
- @Override
- protected Object jobRun() throws Throwable {
- intpA.interpret("1000", new InterpreterContext(
- "jobId",
- "title",
- "text",
- new HashMap<String, Object>(),
- new GUI()));
- return "1000";
- }
-
- @Override
- protected boolean jobAbort() {
- return false;
- }
- };
- scheduler.submit(job);
-
- while (job.isRunning() == false) {
- Thread.sleep(100);
- }
-
- Thread.sleep(500);
- assertEquals(0, scheduler.getJobsWaiting().size());
- assertEquals(1, scheduler.getJobsRunning().size());
-
- Thread.sleep(500);
-
- assertEquals(0, scheduler.getJobsWaiting().size());
- assertEquals(0, scheduler.getJobsRunning().size());
-
- intpA.close();
- schedulerSvc.removeScheduler("test");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java
deleted file mode 100644
index 42d0316..0000000
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/SleepingJob.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class SleepingJob extends Job{
-
- private int time;
- boolean abort = false;
- private long start;
- private int count;
-
-
- public SleepingJob(String jobName, JobListener listener, int time){
- super(jobName, listener);
- this.time = time;
- count = 0;
- }
- public Object jobRun() {
- start = System.currentTimeMillis();
- while(abort==false){
- count++;
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- }
- if(System.currentTimeMillis() - start>time) break;
- }
- return System.currentTimeMillis()-start;
- }
-
- public boolean jobAbort() {
- abort = true;
- return true;
- }
-
- public int progress() {
- long p = (System.currentTimeMillis() - start)*100 / time;
- if(p<0) p = 0;
- if(p>100) p = 100;
- return (int) p;
- }
-
- public Map<String, Object> info() {
- Map<String, Object> i = new HashMap<String, Object>();
- i.put("LoopCount", Integer.toString(count));
- return i;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java
new file mode 100644
index 0000000..626ae99
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/InputTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InputTest {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testDefaultParamReplace() throws IOException{
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
new file mode 100644
index 0000000..02dc224
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.HashMap;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.junit.Test;
+
+public class RemoteInterpreterProcessTest {
+
+ @Test
+ public void testStartStop() {
+ RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
+ assertFalse(rip.isRunning());
+ assertEquals(0, rip.referenceCount());
+ assertEquals(1, rip.reference());
+ assertEquals(2, rip.reference());
+ assertEquals(true, rip.isRunning());
+ assertEquals(1, rip.dereference());
+ assertEquals(true, rip.isRunning());
+ assertEquals(0, rip.dereference());
+ assertEquals(false, rip.isRunning());
+ }
+
+ @Test
+ public void testClientFactory() throws Exception {
+ RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
+ rip.reference();
+ assertEquals(0, rip.getNumActiveClient());
+ assertEquals(0, rip.getNumIdleClient());
+
+ Client client = rip.getClient();
+ assertEquals(1, rip.getNumActiveClient());
+ assertEquals(0, rip.getNumIdleClient());
+
+ rip.releaseClient(client);
+ assertEquals(0, rip.getNumActiveClient());
+ assertEquals(1, rip.getNumIdleClient());
+
+ rip.dereference();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
new file mode 100644
index 0000000..af6b4bd
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteInterpreterServerTest {
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testStartStop() throws InterruptedException, IOException, TException {
+ RemoteInterpreterServer server = new RemoteInterpreterServer(
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
+ assertEquals(false, server.isRunning());
+
+ server.start();
+ long startTime = System.currentTimeMillis();
+ boolean running = false;
+
+ while (System.currentTimeMillis() - startTime < 10 * 1000) {
+ if (server.isRunning()) {
+ running = true;
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ }
+
+ assertEquals(true, running);
+ assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort()));
+
+ server.shutdown();
+
+ while (System.currentTimeMillis() - startTime < 10 * 1000) {
+ if (server.isRunning()) {
+ Thread.sleep(200);
+ } else {
+ running = false;
+ break;
+ }
+ }
+ assertEquals(false, running);
+ }
+
+
+}