You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/11/24 22:54:43 UTC
incubator-reef git commit: [REEF-3] Create basic Task scheduler as an
example
Repository: incubator-reef
Updated Branches:
refs/heads/master b5214eebe -> 1d2ab4817
[REEF-3] Create basic Task scheduler as an example
The current Task scheduler example only supports one evaluator
at a time. This pull request addresses this issue to run multiple
evaluators simultaneously and introduce a new REST interface to
dynamically set the maximum number of evaluator.
* The new API : `/reef-example-scheduler/v1/max-eval?num={num}`.
* Quite a lot of code rearrange and refactoring are done.
JIRA:
[REEF-3] https://issues.apache.org/jira/browse/REEF-3
Pull Request
#23 https://github.com/apache/incubator-reef/pull/23
Closes #23
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/1d2ab481
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/1d2ab481
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/1d2ab481
Branch: refs/heads/master
Commit: 1d2ab48176950615618af9fb5a8f846b75a2cea0
Parents: b5214ee
Author: Yunseong Lee <yu...@apache.org>
Authored: Thu Nov 20 02:34:16 2014 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Mon Nov 24 13:38:57 2014 -0800
----------------------------------------------------------------------
.../scheduler/HttpServerShellCmdHandler.java | 99 ------
.../reef/examples/scheduler/Scheduler.java | 226 ++++++++++++
.../examples/scheduler/SchedulerDriver.java | 343 ++++++++-----------
.../scheduler/SchedulerHttpHandler.java | 107 ++++++
.../reef/examples/scheduler/SchedulerREEF.java | 13 +-
.../examples/scheduler/SchedulerREEFYarn.java | 6 +-
.../examples/scheduler/SchedulerResponse.java | 114 ++++++
.../reef/examples/scheduler/TaskEntity.java | 71 ++++
8 files changed, 677 insertions(+), 302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
deleted file mode 100644
index 4320bcd..0000000
--- a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler;
-
-import org.apache.reef.tang.InjectionFuture;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.webserver.HttpHandler;
-import org.apache.reef.webserver.ParsedHttpRequest;
-
-import javax.inject.Inject;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Receive HttpRequest so that it can handle the command list
- */
-public class HttpServerShellCmdHandler implements HttpHandler {
- final InjectionFuture<SchedulerDriver> schedulerDriver;
-
- private String uriSpecification = "reef-example-scheduler";
-
- @Inject
- public HttpServerShellCmdHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) {
- this.schedulerDriver = schedulerDriver;
- }
-
- @Override
- public String getUriSpecification() {
- return uriSpecification;
- }
-
- @Override
- public void setUriSpecification(String s) {
- uriSpecification = s;
- }
-
- /**
- * HttpRequest handler. You must specify UriSpecification and REST API version.
- * The request url is http://{address}:{port}/reef-example-scheduler/v1
- *
- * APIs
- * /list to get the status list for all tasks
- * /status?id={id} to query the status of such a task, given id
- * /submit?cmd={cmd} to submit a Task, which returns its id
- * /cancel?id={id} to cancel the task's execution
- * /clear to clear the waiting queue
- */
- @Override
- public void onHttpRequest(ParsedHttpRequest request, HttpServletResponse response) throws IOException, ServletException {
- final String target = request.getTargetEntity().toLowerCase();
- final Map<String, List<String>> queryMap = request.getQueryMap();
-
- final String result;
- switch (target) {
- case "list":
- result = schedulerDriver.get().getList();
- break;
- case "clear":
- result = schedulerDriver.get().clearList();
- break;
- case "status":
- result = schedulerDriver.get().getStatus(queryMap.get("id"));
- break;
- case "submit":
- result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
- break;
- case "cancel":
- result = schedulerDriver.get().cancelTask(queryMap.get("id"));
- break;
- default:
- result = "Unsupported operation";
- }
-
- // Send response to the http client
- response.getOutputStream().println(result);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
new file mode 100644
index 0000000..960f297
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The body of Task scheduler. It owns a task queue
+ * and tracks the record of scheduled tasks.
+ */
+@ThreadSafe
+final class Scheduler {
+ /**
+ * Tasks are waiting to be scheduled in the queue.
+ */
+ private final Queue<TaskEntity> taskQueue;
+
+ /**
+ * Lists of {@link TaskEntity} for different states - Running / Finished / Canceled.
+ */
+ private final List<TaskEntity> runningTasks = new ArrayList<>();
+ private final List<TaskEntity> finishedTasks = new ArrayList<>();
+ private final List<TaskEntity> canceledTasks = new ArrayList<>();
+
+ /**
+ * Counts how many tasks have been scheduled.
+ */
+ private final AtomicInteger taskCount = new AtomicInteger(0);
+
+ @Inject
+ public Scheduler() {
+ taskQueue = new LinkedBlockingQueue<>();
+ }
+
+ /**
+ * Submit a task to the ActiveContext.
+ */
+ public synchronized void submitTask(final ActiveContext context) {
+ final TaskEntity task = taskQueue.poll();
+ final Integer taskId = task.getId();
+ final String command = task.getCommand();
+
+ final Configuration taskConf = TaskConfiguration.CONF
+ .set(TaskConfiguration.TASK, ShellTask.class)
+ .set(TaskConfiguration.IDENTIFIER, taskId.toString())
+ .build();
+ final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(Command.class, command)
+ .build();
+
+ final Configuration merged = Configurations.merge(taskConf, commandConf);
+ context.submitTask(merged);
+ runningTasks.add(task);
+ }
+
+ /**
+ * Update the record of task to mark it as canceled.
+ */
+ public synchronized SchedulerResponse cancelTask(final int taskId) {
+ if (getTask(taskId, runningTasks) != null) {
+ return SchedulerResponse.FORBIDDEN("The task " + taskId + " is running");
+ } else if (getTask(taskId, finishedTasks) != null) {
+ return SchedulerResponse.FORBIDDEN("The task " + taskId + " has been finished");
+ }
+
+ final TaskEntity task = getTask(taskId, taskQueue);
+ if (task == null) {
+ final String message = new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString();
+ return SchedulerResponse.NOT_FOUND(message);
+ } else {
+ taskQueue.remove(task);
+ canceledTasks.add(task);
+ return SchedulerResponse.OK("Canceled " + taskId);
+ }
+ }
+
+ /**
+ * Clear the pending list
+ */
+ public synchronized SchedulerResponse clear() {
+ final int count = taskQueue.size();
+ for (final TaskEntity task : taskQueue) {
+ canceledTasks.add(task);
+ }
+ taskQueue.clear();
+ return SchedulerResponse.OK(count + " tasks removed.");
+ }
+
+ /**
+ * Get the list of Tasks, which are grouped by the states.
+ */
+ public synchronized SchedulerResponse getList() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Running :");
+ for (final TaskEntity running : runningTasks) {
+ sb.append(" ").append(running.getId());
+ }
+
+ sb.append("\nWaiting :");
+ for (final TaskEntity waiting : taskQueue) {
+ sb.append(" ").append(waiting.getId());
+ }
+
+ sb.append("\nFinished :");
+ for (final TaskEntity finished : finishedTasks) {
+ sb.append(" ").append(finished.getId());
+ }
+
+ sb.append("\nCanceled :");
+ for (final TaskEntity canceled : canceledTasks) {
+ sb.append(" ").append(canceled.getId());
+ }
+ return SchedulerResponse.OK(sb.toString());
+ }
+
+ /**
+ * Get the status of a Task.
+ */
+ public synchronized SchedulerResponse getTaskStatus(final int taskId) {
+
+ for (final TaskEntity running : runningTasks) {
+ if (taskId == running.getId()) {
+ return SchedulerResponse.OK("Running : " + running.toString());
+ }
+ }
+
+ for (final TaskEntity waiting : taskQueue) {
+ if (taskId == waiting.getId()) {
+ return SchedulerResponse.OK("Waiting : " + waiting.toString());
+ }
+ }
+
+ for (final TaskEntity finished : finishedTasks) {
+ if (taskId == finished.getId()) {
+ return SchedulerResponse.OK("Finished : " + finished.toString());
+ }
+ }
+
+ for (final TaskEntity finished : canceledTasks) {
+ if (taskId == finished.getId()) {
+ return SchedulerResponse.OK("Canceled: " + finished.toString());
+ }
+ }
+ return SchedulerResponse.NOT_FOUND(new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString());
+ }
+
+ /**
+ * Assigns a TaskId to submit.
+ */
+ public synchronized int assignTaskId() {
+ return taskCount.incrementAndGet();
+ }
+
+ /**
+ * Add a task to the queue.
+ */
+ public synchronized void addTask(TaskEntity task) {
+ taskQueue.add(task);
+ }
+
+ /**
+ * Check whether there are tasks waiting to be submitted.
+ */
+ public synchronized boolean hasPendingTasks() {
+ return !taskQueue.isEmpty();
+ }
+
+ /**
+ * Get the number of pending tasks in the queue.
+ */
+ public synchronized int getNumPendingTasks() {
+ return taskQueue.size();
+ }
+
+ /**
+ * Update the record of task to mark it as finished.
+ */
+ public synchronized void setFinished(final int taskId) {
+ final TaskEntity task = getTask(taskId, runningTasks);
+ runningTasks.remove(task);
+ finishedTasks.add(task);
+ }
+
+ /**
+ * Iterate over the collection to find a TaskEntity with ID.
+ */
+ private TaskEntity getTask(final int taskId, final Collection<TaskEntity> tasks) {
+ TaskEntity result = null;
+ for (final TaskEntity task : tasks) {
+ if (taskId == task.getId()) {
+ result = task;
+ break;
+ }
+ }
+ return result;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
index 1f70e8d..890d872 100644
--- a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
@@ -24,13 +24,6 @@ import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.driver.task.CompletedTask;
-import org.apache.reef.driver.task.TaskConfiguration;
-import org.apache.reef.examples.library.Command;
-import org.apache.reef.examples.library.ShellTask;
-import org.apache.reef.io.network.util.Pair;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.wake.EventHandler;
@@ -39,8 +32,7 @@ import org.apache.reef.wake.time.event.StartTime;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -57,14 +49,14 @@ public final class SchedulerDriver {
/**
* Possible states of the job driver. Can be one of:
* <dl>
- * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator</dd>
- * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to be ready.</dd>
- * <du><code>READY</code></du><dd>Wait for the commands. When new Tasks arrive, enqueue the tasks and transit to RUNNING status.</dd>
+ * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator.</dd>
+ * <du><code>WAIT_EVALUATORS</code></du><dd>Waiting for an evaluator allocated with no active evaluators.</dd>
+ * <du><code>READY</code></du><dd>Wait for the commands. Reactivated when a new Task arrives.</dd>
* <du><code>RUNNING</code></du><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd>
* </dl>
*/
private enum State {
- INIT, WAIT_EVALUATOR, READY, RUNNING
+ INIT, WAIT_EVALUATORS, READY, RUNNING
}
/**
@@ -72,36 +64,23 @@ public final class SchedulerDriver {
*/
private boolean retainable;
- private Object lock = new Object();
-
- @GuardedBy("lock")
+ @GuardedBy("SchedulerDriver.this")
private State state = State.INIT;
- @GuardedBy("lock")
- private final Queue<Pair<Integer, String>> taskQueue;
-
- @GuardedBy("lock")
- private Integer runningTaskId = null;
-
- @GuardedBy("lock")
- private Set<Integer> finishedTaskId = new HashSet<>();
+ @GuardedBy("SchedulerDriver.this")
+ private Scheduler scheduler;
- @GuardedBy("lock")
- private Set<Integer> canceledTaskId = new HashSet<>();
+ @GuardedBy("SchedulerDriver.this")
+ private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0;
private final EvaluatorRequestor requestor;
-
- /**
- * Counts how many tasks have been scheduled.
- */
- private AtomicInteger taskCount = new AtomicInteger(0);
-
@Inject
public SchedulerDriver(final EvaluatorRequestor requestor,
- @Parameter(SchedulerREEF.Retain.class) boolean retainable) {
+ @Parameter(SchedulerREEF.Retain.class) boolean retainable,
+ final Scheduler scheduler) {
this.requestor = requestor;
- this.taskQueue = new LinkedList<>();
+ this.scheduler = scheduler;
this.retainable = retainable;
}
@@ -113,8 +92,9 @@ public final class SchedulerDriver {
public void onNext(final StartTime startTime) {
LOG.log(Level.INFO, "Driver started at {0}", startTime);
assert (state == State.INIT);
+ state = State.WAIT_EVALUATORS;
- requestEvaluator();
+ requestEvaluator(1); // Allocate an initial evaluator to avoid idle state.
}
}
@@ -126,7 +106,10 @@ public final class SchedulerDriver {
@Override
public void onNext(final AllocatedEvaluator evaluator) {
LOG.log(Level.INFO, "Evaluator is ready");
- assert (state == State.WAIT_EVALUATOR);
+ synchronized (SchedulerDriver.this) {
+ nActiveEval++;
+ nRequestedEval--;
+ }
evaluator.submitContext(ContextConfiguration.CONF
.set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
@@ -137,246 +120,220 @@ public final class SchedulerDriver {
/**
* Now it is ready to schedule tasks. But if the queue is empty,
* wait until commands coming up.
+ *
+ * If there is no pending task, having more than 1 evaluators must be redundant.
+ * It may happen, for example, when tasks are canceled during allocation.
+ * In these cases, the new evaluator may be abandoned.
*/
final class ActiveContextHandler implements EventHandler<ActiveContext> {
@Override
public void onNext(ActiveContext context) {
- synchronized (lock) {
+ synchronized (SchedulerDriver.this) {
LOG.log(Level.INFO, "Context available : {0}", context.getId());
- assert (state == State.WAIT_EVALUATOR);
- state = State.READY;
- waitForCommands(context);
+ if (scheduler.hasPendingTasks()) {
+ state = State.RUNNING;
+ scheduler.submitTask(context);
+ } else if (nActiveEval > 1) {
+ nActiveEval--;
+ context.close();
+ } else {
+ state = State.READY;
+ waitForCommands(context);
+ }
}
}
}
/**
- * Get the list of Tasks. They are classified as their states.
- * @return
+ * Non-retainable version of CompletedTaskHandler.
+ * When Task completes, it closes the active context to deallocate the evaluator
+ * and if there is outstanding commands, allocate another evaluator.
*/
- public String getList() {
- synchronized (lock) {
- final StringBuilder sb = new StringBuilder("Running : ");
- if (runningTaskId != null) {
- sb.append(runningTaskId);
- }
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask task) {
+ final int taskId = Integer.valueOf(task.getId());
- sb.append("\nWaiting :");
- for (final Pair<Integer, String> entity : taskQueue) {
- sb.append(" ").append(entity.first);
- }
+ synchronized (SchedulerDriver.this) {
+ scheduler.setFinished(taskId);
- sb.append("\nFinished :");
- for (final int taskIds : finishedTaskId) {
- sb.append(" ").append(taskIds);
- }
+ LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable));
+ final ActiveContext context = task.getActiveContext();
- sb.append("\nCanceled :");
- for (final int taskIds : canceledTaskId) {
- sb.append(" ").append(taskIds);
+ if (retainable) {
+ retainEvaluator(context);
+ } else {
+ reallocateEvaluator(context);
+ }
}
- return sb.toString();
}
}
/**
- * Get the status of a Task.
- * @return
+ * Get the list of tasks in the scheduler.
*/
- public String getStatus(final List<String> args) {
- if (args.size() != 1) {
- return getResult(false, "Usage : only one ID at a time");
- }
-
- final Integer taskId = Integer.valueOf(args.get(0));
-
- synchronized (lock) {
- if (taskId.equals(runningTaskId)) {
- return getResult(true, "Running");
- } else if (finishedTaskId.contains(taskId)) {
- return getResult(true, "Finished");
- } else if (canceledTaskId.contains(taskId)) {
- return getResult(true, "Canceled");
- }
+ public synchronized SchedulerResponse getList() {
+ return scheduler.getList();
+ }
- for (final Pair<Integer, String> entity : taskQueue) {
- if (taskId == entity.first) {
- return getResult(true, "Waiting");
- }
- }
- return getResult(false, "Not found");
- }
+ /**
+ * Clear all the Tasks from the waiting queue.
+ */
+ public synchronized SchedulerResponse clearList() {
+ return scheduler.clear();
}
/**
- * Submit a command to schedule.
- * @return
+ * Get the status of a task.
*/
- public String submitCommands(final List<String> args) {
+ public SchedulerResponse getTaskStatus(List<String> args) {
if (args.size() != 1) {
- return getResult(false, "Usage : only one ID at a time");
+ return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
}
- final String command = args.get(0);
-
- synchronized (lock) {
- final Integer id = taskCount.incrementAndGet();
- taskQueue.add(new Pair(id, command));
+ final Integer taskId = Integer.valueOf(args.get(0));
- if (readyToRun()) {
- state = State.RUNNING;
- lock.notify();
- }
- return getResult(true, "Task ID : "+id);
+ synchronized (SchedulerDriver.this) {
+ return scheduler.getTaskStatus(taskId);
}
}
/**
* Cancel a Task waiting on the queue. A task cannot be canceled
* once it is running.
- * @return
*/
- public String cancelTask(final List<String> args) {
+ public SchedulerResponse cancelTask(final List<String> args) {
if (args.size() != 1) {
- return getResult(false, "Usage : only one ID at a time");
+ return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
}
final Integer taskId = Integer.valueOf(args.get(0));
- synchronized (lock) {
- if (taskId.equals(runningTaskId)) {
- return getResult(false, "The task is running");
- } else if (finishedTaskId.contains(taskId)) {
- return getResult(false, "Already finished");
- }
-
- for (final Pair<Integer, String> entity : taskQueue) {
- if (taskId == entity.first) {
- taskQueue.remove(entity);
- canceledTaskId.add(taskId);
- return getResult(true, "Canceled");
- }
- }
- return getResult(false, "Not found");
+ synchronized (SchedulerDriver.this) {
+ return scheduler.cancelTask(taskId);
}
}
/**
- * Clear all the Tasks from the waiting queue.
- * @return
+ * Submit a command to schedule.
*/
- public String clearList() {
- final int count;
- synchronized (lock) {
- count = taskQueue.size();
- for (Pair<Integer, String> entity : taskQueue) {
- canceledTaskId.add(entity.first);
- }
- taskQueue.clear();
+ public SchedulerResponse submitCommands(final List<String> args) {
+ if (args.size() != 1) {
+ return SchedulerResponse.BAD_REQUEST("Usage : only one command at a time");
}
- return getResult(true, count + " tasks removed.");
- }
+ final String command = args.get(0);
+ final Integer id;
+ synchronized (SchedulerDriver.this) {
+ id = scheduler.assignTaskId();
+ scheduler.addTask(new TaskEntity(id, command));
+
+ if (state == State.READY) {
+ SchedulerDriver.this.notify(); // Wake up at {waitForCommands}
+ } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) {
+ requestEvaluator(1);
+ }
+ }
+ return SchedulerResponse.OK("Task ID : " + id);
+ }
/**
- * Non-retainable version of CompletedTaskHandler.
- * When Task completes, it closes the active context to deallocate the evaluator
- * and if there is outstanding commands, allocate another evaluator.
+ * Update the maximum number of evaluators to hold.
+ * Request more evaluators in case there are pending tasks
+ * in the queue and the number of evaluators is less than the limit.
*/
- final class CompletedTaskHandler implements EventHandler<CompletedTask> {
- @Override
- public void onNext(final CompletedTask task) {
- synchronized (lock) {
- finishedTaskId.add(runningTaskId);
- runningTaskId = null;
+ public SchedulerResponse setMaxEvaluators(final List<String> args) {
+ if (args.size() != 1) {
+ return SchedulerResponse.BAD_REQUEST("Usage : Only one value can be used");
+ }
- LOG.log(Level.INFO, "Task completed. Reuse the evaluator :", String.valueOf(retainable));
+ final int nTarget = Integer.valueOf(args.get(0));
- if (retainable) {
- if (taskQueue.isEmpty()) {
- state = State.READY;
- }
- waitForCommands(task.getActiveContext());
- } else {
- task.getActiveContext().close();
- state = State.WAIT_EVALUATOR;
- requestEvaluator();
- }
+ synchronized (SchedulerDriver.this) {
+ if (nTarget < nActiveEval + nRequestedEval) {
+ return SchedulerResponse.FORBIDDEN(nActiveEval + nRequestedEval +
+ " evaluators are used now. Should be larger than that.");
+ }
+ nMaxEval = nTarget;
+
+ if (scheduler.hasPendingTasks()) {
+ final int nToRequest =
+ Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval;
+ requestEvaluator(nToRequest);
}
+ return SchedulerResponse.OK("You can use evaluators up to " + nMaxEval + " evaluators.");
}
}
/**
- * Request an evaluator
+ * Request evaluators. Passing a non positive number is illegal,
+ * so it does not make a trial for that situation.
*/
- private synchronized void requestEvaluator() {
- requestor.submit(EvaluatorRequest.newBuilder()
- .setMemory(128)
- .setNumber(1)
- .build());
- }
+ private void requestEvaluator(final int numToRequest) {
+ if (numToRequest <= 0) {
+ throw new IllegalArgumentException("The number of evaluator request should be a positive integer");
+ }
- /**
- * @param command The command to execute
- */
- private void submit(final ActiveContext context, final Integer taskId, final String command) {
- final Configuration taskConf = TaskConfiguration.CONF
- .set(TaskConfiguration.TASK, ShellTask.class)
- .set(TaskConfiguration.IDENTIFIER, "ShellTask"+taskId)
- .build();
- final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder()
- .bindNamedParameter(Command.class, command)
- .build();
-
- LOG.log(Level.INFO, "Submitting command : {0}", command);
- final Configuration merged = Configurations.merge(taskConf, commandConf);
- context.submitTask(merged);
+ synchronized (SchedulerDriver.this) {
+ nRequestedEval += numToRequest;
+ requestor.submit(EvaluatorRequest.newBuilder()
+ .setMemory(32)
+ .setNumber(numToRequest)
+ .build());
+ }
}
/**
* Pick up a command from the queue and run it. Wait until
* any command coming up if no command exists.
- * @param context
*/
private void waitForCommands(final ActiveContext context) {
- synchronized (lock) {
- while (taskQueue.isEmpty()) {
- // Wait until any commands enter in the queue
+ synchronized (SchedulerDriver.this) {
+ while (!scheduler.hasPendingTasks()) {
+ // Wait until any command enters in the queue
try {
- lock.wait();
+ SchedulerDriver.this.wait();
} catch (InterruptedException e) {
LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e);
}
}
-
- // Run the first command from the queue.
- final Pair<Integer, String> task = taskQueue.poll();
- runningTaskId = task.first;
- final String command = task.second;
- submit(context, runningTaskId, command);
+ // When wakes up, run the first command from the queue.
+ state = State.RUNNING;
+ scheduler.submitTask(context);
}
}
/**
- * @return {@code true} if it is possible to run commands.
+ * Retain the complete evaluators submitting another task
+ * until there is no need to reuse them.
*/
- private boolean readyToRun() {
- synchronized (lock) {
- return state == State.READY && taskQueue.size() > 0;
+ private synchronized void retainEvaluator(final ActiveContext context) {
+ if (scheduler.hasPendingTasks()) {
+ scheduler.submitTask(context);
+ } else if (nActiveEval > 1) {
+ nActiveEval--;
+ context.close();
+ } else {
+ state = State.READY;
+ waitForCommands(context);
}
}
/**
- * Return the result including status and message
- * @param success
- * @param message
- * @return
+ * Always close the complete evaluators and
+ * allocate a new evaluator if necessary.
*/
- private static String getResult(final boolean success, final String message) {
- final StringBuilder sb = new StringBuilder();
- final String status = success ? "Success" : "Error";
- return sb.append("[").append(status).append("] ").append(message).toString();
+ private synchronized void reallocateEvaluator(final ActiveContext context) {
+ nActiveEval--;
+ context.close();
+
+ if (scheduler.hasPendingTasks()) {
+ requestEvaluator(1);
+ } else if (nActiveEval <= 0) {
+ state = State.WAIT_EVALUATORS;
+ requestEvaluator(1);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
new file mode 100644
index 0000000..c2f6090
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Receive HttpRequest so that it can handle the command list
+ */
+final class SchedulerHttpHandler implements HttpHandler {
+ final InjectionFuture<SchedulerDriver> schedulerDriver;
+
+ private String uriSpecification = "reef-example-scheduler";
+
+ @Inject
+ public SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) {
+ this.schedulerDriver = schedulerDriver;
+ }
+
+ @Override
+ public String getUriSpecification() {
+ return uriSpecification;
+ }
+
+ @Override
+ public void setUriSpecification(String s) {
+ uriSpecification = s;
+ }
+
+ /**
+ * HttpRequest handler. You must specify UriSpecification and REST API version.
+ * The request url is http://{address}:{port}/reef-example-scheduler/v1
+ *
+ * APIs
+ * /list to get the status list for all tasks
+ * /status?id={id} to query the status of such a task, given id
+ * /submit?cmd={cmd} to submit a Task, which returns its id
+ * /cancel?id={id} to cancel the task's execution
+ * /num-eval?num={num} to set the maximum number of evaluators
+ * /clear to clear the waiting queue
+ */
+ @Override
+ public void onHttpRequest(ParsedHttpRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ final String target = request.getTargetEntity().toLowerCase();
+ final Map<String, List<String>> queryMap = request.getQueryMap();
+
+ final SchedulerResponse result;
+ switch (target) {
+ case "list":
+ result = schedulerDriver.get().getList();
+ break;
+ case "clear":
+ result = schedulerDriver.get().clearList();
+ break;
+ case "status":
+ result = schedulerDriver.get().getTaskStatus(queryMap.get("id"));
+ break;
+ case "submit":
+ result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
+ break;
+ case "cancel":
+ result = schedulerDriver.get().cancelTask(queryMap.get("id"));
+ break;
+ case "max-eval":
+ result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num"));
+ break;
+ default:
+ result = SchedulerResponse.NOT_FOUND("Unsupported operation");
+ }
+
+ // Send response to the http client
+ final int status = result.getStatus();
+ final String message= result.getMessage();
+
+ if (result.isOK()) {
+ response.getOutputStream().println(message);
+ } else {
+ response.sendError(status, message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
index 24f8ed5..2911905 100644
--- a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
@@ -20,20 +20,17 @@ package org.apache.reef.examples.scheduler;
import org.apache.commons.cli.ParseException;
import org.apache.reef.client.DriverConfiguration;
-import org.apache.reef.client.DriverServiceConfiguration;
import org.apache.reef.client.REEF;
import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
-import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.webserver.HttpHandlerConfiguration;
-import org.apache.reef.webserver.ReefEventStateManager;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
import java.io.IOException;
@@ -43,7 +40,8 @@ import java.io.IOException;
public final class SchedulerREEF {
/**
- * Command line parameter = true to reuse evaluators, or false allocate/close for each iteration
+ * Command line parameter = true to reuse evaluators,
+ * or false to allocate/close for each iteration
*/
@NamedParameter(doc = "Whether or not to reuse evaluators",
short_name = "retain", default_value = "true")
@@ -55,10 +53,9 @@ public final class SchedulerREEF {
*/
private final static Configuration getHttpConf() {
final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF
- .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerShellCmdHandler.class)
+ .set(HttpHandlerConfiguration.HTTP_HANDLERS, SchedulerHttpHandler.class)
.build();
return httpHandlerConf;
-
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
index 99a593b..c42cba5 100644
--- a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
@@ -37,8 +37,10 @@ public final class SchedulerREEFYarn {
* @throws InjectionException
* @throws java.io.IOException
*/
- public final static void main(String[] args) throws InjectionException, IOException, ParseException {
- final Configuration runtimeConfiguration = YarnClientConfiguration.CONF.build();
+ public final static void main(String[] args)
+ throws InjectionException, IOException, ParseException {
+ final Configuration runtimeConfiguration =
+ YarnClientConfiguration.CONF.build();
runTaskScheduler(runtimeConfiguration, args);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
new file mode 100644
index 0000000..50293e6
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+/**
+ * This class specifies the response from the Scheduler.
+ * It includes the status code and message.
+ */
+final class SchedulerResponse {
+ /**
+ * 200 OK : The request succeeded normally.
+ */
+ private static final int SC_OK = 200;
+
+ /**
+ * 400 BAD REQUEST : The request is syntactically incorrect.
+ */
+ private static final int SC_BAD_REQUEST = 400;
+
+ /**
+ * 403 FORBIDDEN : Syntactically okay but refused to process.
+ */
+ private static final int SC_FORBIDDEN = 403;
+
+ /**
+ * 404 NOT FOUND : The resource is not available.
+ */
+ private static final int SC_NOT_FOUND = 404;
+
+ /**
+ * Create a response with OK status
+ */
+ public static SchedulerResponse OK(final String message){
+ return new SchedulerResponse (SC_OK, message);
+ }
+
+ /**
+ * Create a response with BAD_REQUEST status
+ */
+ public static SchedulerResponse BAD_REQUEST(final String message){
+ return new SchedulerResponse (SC_BAD_REQUEST, message);
+ }
+
+ /**
+ * Create a response with FORBIDDEN status
+ */
+ public static SchedulerResponse FORBIDDEN(final String message){
+ return new SchedulerResponse (SC_FORBIDDEN, message);
+ }
+
+ /**
+ * Create a response with NOT FOUND status
+ */
+ public static SchedulerResponse NOT_FOUND(final String message){
+ return new SchedulerResponse (SC_NOT_FOUND, message);
+ }
+
+ /**
+ * Return {@code true} if the response is OK.
+ */
+ public boolean isOK(){
+ return this.status == SC_OK;
+ }
+
+ /**
+ * Status code of the request based on RFC 2068.
+ */
+ private int status;
+
+ /**
+ * Message to send.
+ */
+ private String message;
+
+ /**
+ * Constructor using status code and message.
+ * @param status
+ * @param message
+ */
+ private SchedulerResponse(final int status, final String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ /**
+ * Return the status code of this response.
+ */
+ int getStatus() {
+ return status;
+ }
+
+ /**
+ * Return the message of this response.
+ */
+ String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
new file mode 100644
index 0000000..fe777ff
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+/**
+ * TaskEntity represent a single entry of task queue used in
+ * scheduler. Since REEF already has the class named {Task},
+ * a different name is used for this class.
+ */
+final class TaskEntity {
+ private final int taskId;
+ private final String command;
+
+ public TaskEntity(final int taskId, final String command) {
+ this.taskId = taskId;
+ this.command = command;
+ }
+
+ /**
+ * Return the TaskID assigned to this Task.
+ */
+ int getId() {
+ return taskId;
+ }
+
+ String getCommand() {
+ return command;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TaskEntity that = (TaskEntity) o;
+
+ if (taskId != that.taskId) return false;
+ if (!command.equals(that.command)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = taskId;
+ result = 31 * result + command.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("<Id=").append(taskId).
+ append(", Command=").append(command).append(">").toString();
+ }
+}