You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/11/24 22:54:43 UTC

incubator-reef git commit: [REEF-3] Create basic Task scheduler as an example

Repository: incubator-reef
Updated Branches:
  refs/heads/master b5214eebe -> 1d2ab4817


[REEF-3] Create basic Task scheduler as an example

  The current Task scheduler example only supports one evaluator
  at a time. This pull request addresses this issue to run multiple
  evaluators simultaneously and introduce a new REST interface to
  dynamically set the maximum number of evaluator.

  * The new API : `/reef-example-scheduler/v1/max-eval?num={num}`.
  * Quite a lot of code rearrange and refactoring are done.

JIRA:
  [REEF-3] https://issues.apache.org/jira/browse/REEF-3

Pull Request
  #23 https://github.com/apache/incubator-reef/pull/23

Closes #23


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/1d2ab481
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/1d2ab481
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/1d2ab481

Branch: refs/heads/master
Commit: 1d2ab48176950615618af9fb5a8f846b75a2cea0
Parents: b5214ee
Author: Yunseong Lee <yu...@apache.org>
Authored: Thu Nov 20 02:34:16 2014 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Mon Nov 24 13:38:57 2014 -0800

----------------------------------------------------------------------
 .../scheduler/HttpServerShellCmdHandler.java    |  99 ------
 .../reef/examples/scheduler/Scheduler.java      | 226 ++++++++++++
 .../examples/scheduler/SchedulerDriver.java     | 343 ++++++++-----------
 .../scheduler/SchedulerHttpHandler.java         | 107 ++++++
 .../reef/examples/scheduler/SchedulerREEF.java  |  13 +-
 .../examples/scheduler/SchedulerREEFYarn.java   |   6 +-
 .../examples/scheduler/SchedulerResponse.java   | 114 ++++++
 .../reef/examples/scheduler/TaskEntity.java     |  71 ++++
 8 files changed, 677 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
deleted file mode 100644
index 4320bcd..0000000
--- a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler;
-
-import org.apache.reef.tang.InjectionFuture;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.webserver.HttpHandler;
-import org.apache.reef.webserver.ParsedHttpRequest;
-
-import javax.inject.Inject;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Receive HttpRequest so that it can handle the command list
- */
-public class HttpServerShellCmdHandler implements HttpHandler {
-  final InjectionFuture<SchedulerDriver> schedulerDriver;
-
-  private String uriSpecification = "reef-example-scheduler";
-
-  @Inject
-  public HttpServerShellCmdHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) {
-    this.schedulerDriver = schedulerDriver;
-  }
-
-  @Override
-  public String getUriSpecification() {
-    return uriSpecification;
-  }
-
-  @Override
-  public void setUriSpecification(String s) {
-    uriSpecification = s;
-  }
-
-  /**
-   * HttpRequest handler. You must specify UriSpecification and REST API version.
-   * The request url is http://{address}:{port}/reef-example-scheduler/v1
-   *
-   * APIs
-   *   /list              to get the status list for all tasks
-   *   /status?id={id}    to query the status of such a task, given id
-   *   /submit?cmd={cmd}  to submit a Task, which returns its id
-   *   /cancel?id={id}    to cancel the task's execution
-   *   /clear             to clear the waiting queue
-   */
-  @Override
-  public void onHttpRequest(ParsedHttpRequest request, HttpServletResponse response) throws IOException, ServletException {
-    final String target = request.getTargetEntity().toLowerCase();
-    final Map<String, List<String>> queryMap = request.getQueryMap();
-
-    final String result;
-    switch (target) {
-      case "list":
-        result = schedulerDriver.get().getList();
-        break;
-      case "clear":
-        result = schedulerDriver.get().clearList();
-        break;
-      case "status":
-        result = schedulerDriver.get().getStatus(queryMap.get("id"));
-        break;
-      case "submit":
-        result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
-        break;
-      case "cancel":
-        result = schedulerDriver.get().cancelTask(queryMap.get("id"));
-        break;
-      default:
-        result = "Unsupported operation";
-    }
-
-    // Send response to the http client
-    response.getOutputStream().println(result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
new file mode 100644
index 0000000..960f297
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The body of Task scheduler. It owns a task queue
+ * and tracks the record of scheduled tasks.
+ */
+@ThreadSafe
+final class Scheduler {
+  /**
+   * Tasks are waiting to be scheduled in the queue.
+   */
+  private final Queue<TaskEntity> taskQueue;
+
+  /**
+   * Lists of {@link TaskEntity} for different states - Running / Finished / Canceled.
+   */
+  private final List<TaskEntity> runningTasks = new ArrayList<>();
+  private final List<TaskEntity> finishedTasks = new ArrayList<>();
+  private final List<TaskEntity> canceledTasks = new ArrayList<>();
+
+  /**
+   * Counts how many tasks have been scheduled.
+   */
+  private final AtomicInteger taskCount = new AtomicInteger(0);
+
+  @Inject
+  public Scheduler() {
+    taskQueue = new LinkedBlockingQueue<>();
+  }
+
+  /**
+   * Submit a task to the ActiveContext.
+   */
+  public synchronized void submitTask(final ActiveContext context) {
+    final TaskEntity task = taskQueue.poll();
+    final Integer taskId = task.getId();
+    final String command = task.getCommand();
+
+    final Configuration taskConf = TaskConfiguration.CONF
+      .set(TaskConfiguration.TASK, ShellTask.class)
+      .set(TaskConfiguration.IDENTIFIER, taskId.toString())
+      .build();
+    final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder()
+      .bindNamedParameter(Command.class, command)
+      .build();
+
+    final Configuration merged = Configurations.merge(taskConf, commandConf);
+    context.submitTask(merged);
+    runningTasks.add(task);
+  }
+
+  /**
+   * Update the record of task to mark it as canceled.
+   */
+  public synchronized SchedulerResponse cancelTask(final int taskId) {
+    if (getTask(taskId, runningTasks) != null) {
+      return SchedulerResponse.FORBIDDEN("The task " + taskId + " is running");
+    } else if (getTask(taskId, finishedTasks) != null) {
+      return SchedulerResponse.FORBIDDEN("The task " + taskId + " has been finished");
+    }
+
+    final TaskEntity task = getTask(taskId, taskQueue);
+    if (task == null) {
+      final String message = new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString();
+      return SchedulerResponse.NOT_FOUND(message);
+    } else {
+      taskQueue.remove(task);
+      canceledTasks.add(task);
+      return SchedulerResponse.OK("Canceled " + taskId);
+    }
+  }
+
+  /**
+   * Clear the pending list
+   */
+  public synchronized SchedulerResponse clear() {
+    final int count = taskQueue.size();
+    for (final TaskEntity task : taskQueue) {
+      canceledTasks.add(task);
+    }
+    taskQueue.clear();
+    return SchedulerResponse.OK(count + " tasks removed.");
+  }
+
+  /**
+   * Get the list of Tasks, which are grouped by the states.
+   */
+  public synchronized SchedulerResponse getList() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("Running :");
+    for (final TaskEntity running : runningTasks) {
+      sb.append(" ").append(running.getId());
+    }
+
+    sb.append("\nWaiting :");
+    for (final TaskEntity waiting : taskQueue) {
+      sb.append(" ").append(waiting.getId());
+    }
+
+    sb.append("\nFinished :");
+    for (final TaskEntity finished : finishedTasks) {
+      sb.append(" ").append(finished.getId());
+    }
+
+    sb.append("\nCanceled :");
+    for (final TaskEntity canceled : canceledTasks) {
+      sb.append(" ").append(canceled.getId());
+    }
+    return SchedulerResponse.OK(sb.toString());
+  }
+
+  /**
+   * Get the status of a Task.
+   */
+  public synchronized SchedulerResponse getTaskStatus(final int taskId) {
+
+    for (final TaskEntity running : runningTasks) {
+      if (taskId == running.getId()) {
+        return SchedulerResponse.OK("Running : " + running.toString());
+      }
+    }
+
+    for (final TaskEntity waiting : taskQueue) {
+      if (taskId == waiting.getId()) {
+        return SchedulerResponse.OK("Waiting : " + waiting.toString());
+      }
+    }
+
+    for (final TaskEntity finished : finishedTasks) {
+      if (taskId == finished.getId()) {
+        return SchedulerResponse.OK("Finished : " + finished.toString());
+      }
+    }
+
+    for (final TaskEntity finished : canceledTasks) {
+      if (taskId == finished.getId()) {
+        return SchedulerResponse.OK("Canceled: " + finished.toString());
+      }
+    }
+    return SchedulerResponse.NOT_FOUND(new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString());
+  }
+
+  /**
+   * Assigns a TaskId to submit.
+   */
+  public synchronized int assignTaskId() {
+    return taskCount.incrementAndGet();
+  }
+
+  /**
+   * Add a task to the queue.
+   */
+  public synchronized void addTask(TaskEntity task) {
+    taskQueue.add(task);
+  }
+
+  /**
+   * Check whether there are tasks waiting to be submitted.
+   */
+  public synchronized boolean hasPendingTasks() {
+    return !taskQueue.isEmpty();
+  }
+
+  /**
+   * Get the number of pending tasks in the queue.
+   */
+  public synchronized int getNumPendingTasks() {
+    return taskQueue.size();
+  }
+
+  /**
+   * Update the record of task to mark it as finished.
+   */
+  public synchronized void setFinished(final int taskId) {
+    final TaskEntity task = getTask(taskId, runningTasks);
+    runningTasks.remove(task);
+    finishedTasks.add(task);
+  }
+
+  /**
+   * Iterate over the collection to find a TaskEntity with ID.
+   */
+  private TaskEntity getTask(final int taskId, final Collection<TaskEntity> tasks) {
+    TaskEntity result = null;
+    for (final TaskEntity task : tasks) {
+      if (taskId == task.getId()) {
+        result = task;
+        break;
+      }
+    }
+    return result;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
index 1f70e8d..890d872 100644
--- a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
@@ -24,13 +24,6 @@ import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.driver.evaluator.EvaluatorRequest;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.driver.task.CompletedTask;
-import org.apache.reef.driver.task.TaskConfiguration;
-import org.apache.reef.examples.library.Command;
-import org.apache.reef.examples.library.ShellTask;
-import org.apache.reef.io.network.util.Pair;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.wake.EventHandler;
@@ -39,8 +32,7 @@ import org.apache.reef.wake.time.event.StartTime;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.inject.Inject;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -57,14 +49,14 @@ public final class SchedulerDriver {
   /**
    * Possible states of the job driver. Can be one of:
    * <dl>
-   * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator</dd>
-   * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to be ready.</dd>
-   * <du><code>READY</code></du><dd>Wait for the commands. When new Tasks arrive, enqueue the tasks and transit to RUNNING status.</dd>
+   * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator.</dd>
+   * <du><code>WAIT_EVALUATORS</code></du><dd>Waiting for an evaluator allocated with no active evaluators.</dd>
+   * <du><code>READY</code></du><dd>Wait for the commands. Reactivated when a new Task arrives.</dd>
    * <du><code>RUNNING</code></du><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd>
    * </dl>
    */
   private enum State {
-    INIT, WAIT_EVALUATOR, READY, RUNNING
+    INIT, WAIT_EVALUATORS, READY, RUNNING
   }
 
   /**
@@ -72,36 +64,23 @@ public final class SchedulerDriver {
    */
   private boolean retainable;
 
-  private Object lock = new Object();
-
-  @GuardedBy("lock")
+  @GuardedBy("SchedulerDriver.this")
   private State state = State.INIT;
 
-  @GuardedBy("lock")
-  private final Queue<Pair<Integer, String>> taskQueue;
-
-  @GuardedBy("lock")
-  private Integer runningTaskId = null;
-
-  @GuardedBy("lock")
-  private Set<Integer> finishedTaskId = new HashSet<>();
+  @GuardedBy("SchedulerDriver.this")
+  private Scheduler scheduler;
 
-  @GuardedBy("lock")
-  private Set<Integer> canceledTaskId = new HashSet<>();
+  @GuardedBy("SchedulerDriver.this")
+  private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0;
 
   private final EvaluatorRequestor requestor;
 
-
-  /**
-   * Counts how many tasks have been scheduled.
-   */
-  private AtomicInteger taskCount = new AtomicInteger(0);
-
   @Inject
   public SchedulerDriver(final EvaluatorRequestor requestor,
-                         @Parameter(SchedulerREEF.Retain.class) boolean retainable) {
+                         @Parameter(SchedulerREEF.Retain.class) boolean retainable,
+                         final Scheduler scheduler) {
     this.requestor = requestor;
-    this.taskQueue = new LinkedList<>();
+    this.scheduler = scheduler;
     this.retainable = retainable;
   }
 
@@ -113,8 +92,9 @@ public final class SchedulerDriver {
     public void onNext(final StartTime startTime) {
       LOG.log(Level.INFO, "Driver started at {0}", startTime);
       assert (state == State.INIT);
+      state = State.WAIT_EVALUATORS;
 
-      requestEvaluator();
+      requestEvaluator(1); // Allocate an initial evaluator to avoid idle state.
     }
   }
 
@@ -126,7 +106,10 @@ public final class SchedulerDriver {
     @Override
     public void onNext(final AllocatedEvaluator evaluator) {
       LOG.log(Level.INFO, "Evaluator is ready");
-      assert (state == State.WAIT_EVALUATOR);
+      synchronized (SchedulerDriver.this) {
+        nActiveEval++;
+        nRequestedEval--;
+      }
 
       evaluator.submitContext(ContextConfiguration.CONF
         .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
@@ -137,246 +120,220 @@ public final class SchedulerDriver {
   /**
    * Now it is ready to schedule tasks. But if the queue is empty,
    * wait until commands coming up.
+   *
+   * If there is no pending task, having more than 1 evaluators must be redundant.
+   * It may happen, for example, when tasks are canceled during allocation.
+   * In these cases, the new evaluator may be abandoned.
    */
   final class ActiveContextHandler implements EventHandler<ActiveContext> {
     @Override
     public void onNext(ActiveContext context) {
-      synchronized (lock) {
+      synchronized (SchedulerDriver.this) {
         LOG.log(Level.INFO, "Context available : {0}", context.getId());
-        assert (state == State.WAIT_EVALUATOR);
 
-        state = State.READY;
-        waitForCommands(context);
+        if (scheduler.hasPendingTasks()) {
+          state = State.RUNNING;
+          scheduler.submitTask(context);
+        } else if (nActiveEval > 1) {
+          nActiveEval--;
+          context.close();
+        } else {
+          state = State.READY;
+          waitForCommands(context);
+        }
       }
     }
   }
 
   /**
-   * Get the list of Tasks. They are classified as their states.
-   * @return
+   * Non-retainable version of CompletedTaskHandler.
+   * When Task completes, it closes the active context to deallocate the evaluator
+   * and if there is outstanding commands, allocate another evaluator.
    */
-  public String getList() {
-    synchronized (lock) {
-      final StringBuilder sb = new StringBuilder("Running : ");
-      if (runningTaskId != null) {
-        sb.append(runningTaskId);
-      }
+  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask task) {
+      final int taskId = Integer.valueOf(task.getId());
 
-      sb.append("\nWaiting :");
-      for (final Pair<Integer, String> entity : taskQueue) {
-        sb.append(" ").append(entity.first);
-      }
+      synchronized (SchedulerDriver.this) {
+        scheduler.setFinished(taskId);
 
-      sb.append("\nFinished :");
-      for (final int taskIds : finishedTaskId) {
-        sb.append(" ").append(taskIds);
-      }
+        LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable));
+        final ActiveContext context = task.getActiveContext();
 
-      sb.append("\nCanceled :");
-      for (final int taskIds : canceledTaskId) {
-        sb.append(" ").append(taskIds);
+        if (retainable) {
+          retainEvaluator(context);
+        } else {
+          reallocateEvaluator(context);
+        }
       }
-      return sb.toString();
     }
   }
 
   /**
-   * Get the status of a Task.
-   * @return
+   * Get the list of tasks in the scheduler.
    */
-  public String getStatus(final List<String> args) {
-    if (args.size() != 1) {
-      return getResult(false, "Usage : only one ID at a time");
-    }
-
-    final Integer taskId = Integer.valueOf(args.get(0));
-
-    synchronized (lock) {
-      if (taskId.equals(runningTaskId)) {
-        return getResult(true, "Running");
-      } else if (finishedTaskId.contains(taskId)) {
-        return getResult(true, "Finished");
-      } else if (canceledTaskId.contains(taskId)) {
-        return getResult(true, "Canceled");
-      }
+  public synchronized SchedulerResponse getList() {
+    return scheduler.getList();
+  }
 
-      for (final Pair<Integer, String> entity : taskQueue) {
-        if (taskId == entity.first) {
-          return getResult(true, "Waiting");
-        }
-      }
-      return getResult(false, "Not found");
-    }
+  /**
+   * Clear all the Tasks from the waiting queue.
+   */
+  public synchronized SchedulerResponse clearList() {
+    return scheduler.clear();
   }
 
   /**
-   * Submit a command to schedule.
-   * @return
+   * Get the status of a task.
    */
-  public String submitCommands(final List<String> args) {
+  public SchedulerResponse getTaskStatus(List<String> args) {
     if (args.size() != 1) {
-      return getResult(false, "Usage : only one ID at a time");
+      return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
     }
 
-    final String command = args.get(0);
-
-    synchronized (lock) {
-      final Integer id = taskCount.incrementAndGet();
-      taskQueue.add(new Pair(id, command));
+    final Integer taskId = Integer.valueOf(args.get(0));
 
-      if (readyToRun()) {
-        state = State.RUNNING;
-        lock.notify();
-      }
-      return getResult(true, "Task ID : "+id);
+    synchronized (SchedulerDriver.this) {
+      return scheduler.getTaskStatus(taskId);
     }
   }
 
   /**
    * Cancel a Task waiting on the queue. A task cannot be canceled
    * once it is running.
-   * @return
    */
-  public String cancelTask(final List<String> args) {
+  public SchedulerResponse cancelTask(final List<String> args) {
     if (args.size() != 1) {
-      return getResult(false, "Usage : only one ID at a time");
+      return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
     }
 
     final Integer taskId = Integer.valueOf(args.get(0));
 
-    synchronized (lock) {
-      if (taskId.equals(runningTaskId)) {
-        return getResult(false, "The task is running");
-      } else if (finishedTaskId.contains(taskId)) {
-        return getResult(false, "Already finished");
-      }
-
-      for (final Pair<Integer, String> entity : taskQueue) {
-        if (taskId == entity.first) {
-          taskQueue.remove(entity);
-          canceledTaskId.add(taskId);
-          return getResult(true, "Canceled");
-        }
-      }
-      return getResult(false, "Not found");
+    synchronized (SchedulerDriver.this) {
+      return scheduler.cancelTask(taskId);
     }
   }
 
   /**
-   * Clear all the Tasks from the waiting queue.
-   * @return
+   * Submit a command to schedule.
    */
-  public String clearList() {
-    final int count;
-    synchronized (lock) {
-      count = taskQueue.size();
-      for (Pair<Integer, String> entity : taskQueue) {
-        canceledTaskId.add(entity.first);
-      }
-      taskQueue.clear();
+  public SchedulerResponse submitCommands(final List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.BAD_REQUEST("Usage : only one command at a time");
     }
-    return getResult(true, count + " tasks removed.");
-  }
 
+    final String command = args.get(0);
+    final Integer id;
 
+    synchronized (SchedulerDriver.this) {
+      id = scheduler.assignTaskId();
+      scheduler.addTask(new TaskEntity(id, command));
+
+      if (state == State.READY) {
+        SchedulerDriver.this.notify(); // Wake up at {waitForCommands}
+      } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) {
+        requestEvaluator(1);
+      }
+    }
+    return SchedulerResponse.OK("Task ID : " + id);
+  }
 
   /**
-   * Non-retainable version of CompletedTaskHandler.
-   * When Task completes, it closes the active context to deallocate the evaluator
-   * and if there is outstanding commands, allocate another evaluator.
+   * Update the maximum number of evaluators to hold.
+   * Request more evaluators in case there are pending tasks
+   * in the queue and the number of evaluators is less than the limit.
    */
-  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
-    @Override
-    public void onNext(final CompletedTask task) {
-      synchronized (lock) {
-        finishedTaskId.add(runningTaskId);
-        runningTaskId = null;
+  public SchedulerResponse setMaxEvaluators(final List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.BAD_REQUEST("Usage : Only one value can be used");
+    }
 
-        LOG.log(Level.INFO, "Task completed. Reuse the evaluator :", String.valueOf(retainable));
+    final int nTarget = Integer.valueOf(args.get(0));
 
-        if (retainable) {
-          if (taskQueue.isEmpty()) {
-            state = State.READY;
-          }
-          waitForCommands(task.getActiveContext());
-        } else {
-          task.getActiveContext().close();
-          state = State.WAIT_EVALUATOR;
-          requestEvaluator();
-        }
+    synchronized (SchedulerDriver.this) {
+      if (nTarget < nActiveEval + nRequestedEval) {
+        return SchedulerResponse.FORBIDDEN(nActiveEval + nRequestedEval +
+          " evaluators are used now. Should be larger than that.");
+      }
+      nMaxEval = nTarget;
+
+      if (scheduler.hasPendingTasks()) {
+        final int nToRequest =
+          Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval;
+        requestEvaluator(nToRequest);
       }
+      return SchedulerResponse.OK("You can use evaluators up to " + nMaxEval + " evaluators.");
     }
   }
 
   /**
-   * Request an evaluator
+   * Request evaluators. Passing a non positive number is illegal,
+   * so it does not make a trial for that situation.
    */
-  private synchronized void requestEvaluator() {
-    requestor.submit(EvaluatorRequest.newBuilder()
-      .setMemory(128)
-      .setNumber(1)
-      .build());
-  }
+  private void requestEvaluator(final int numToRequest) {
+    if (numToRequest <= 0) {
+      throw new IllegalArgumentException("The number of evaluator request should be a positive integer");
+    }
 
-  /**
-   * @param command The command to execute
-   */
-  private void submit(final ActiveContext context, final Integer taskId, final String command) {
-    final Configuration taskConf = TaskConfiguration.CONF
-      .set(TaskConfiguration.TASK, ShellTask.class)
-      .set(TaskConfiguration.IDENTIFIER, "ShellTask"+taskId)
-      .build();
-    final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder()
-      .bindNamedParameter(Command.class, command)
-      .build();
-
-    LOG.log(Level.INFO, "Submitting command : {0}", command);
-    final Configuration merged = Configurations.merge(taskConf, commandConf);
-    context.submitTask(merged);
+    synchronized (SchedulerDriver.this) {
+      nRequestedEval += numToRequest;
+      requestor.submit(EvaluatorRequest.newBuilder()
+        .setMemory(32)
+        .setNumber(numToRequest)
+        .build());
+    }
   }
 
   /**
    * Pick up a command from the queue and run it. Wait until
    * any command coming up if no command exists.
-   * @param context
    */
   private void waitForCommands(final ActiveContext context) {
-    synchronized (lock) {
-      while (taskQueue.isEmpty()) {
-        // Wait until any commands enter in the queue
+    synchronized (SchedulerDriver.this) {
+      while (!scheduler.hasPendingTasks()) {
+        // Wait until any command enters in the queue
         try {
-          lock.wait();
+          SchedulerDriver.this.wait();
         } catch (InterruptedException e) {
           LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e);
         }
       }
-
-      // Run the first command from the queue.
-      final Pair<Integer, String> task = taskQueue.poll();
-      runningTaskId = task.first;
-      final String command = task.second;
-      submit(context, runningTaskId, command);
+      // When wakes up, run the first command from the queue.
+      state = State.RUNNING;
+      scheduler.submitTask(context);
     }
   }
 
   /**
-   * @return {@code true} if it is possible to run commands.
+   * Retain the complete evaluators submitting another task
+   * until there is no need to reuse them.
    */
-  private boolean readyToRun() {
-    synchronized (lock) {
-      return state == State.READY && taskQueue.size() > 0;
+  private synchronized void retainEvaluator(final ActiveContext context) {
+    if (scheduler.hasPendingTasks()) {
+      scheduler.submitTask(context);
+    } else if (nActiveEval > 1) {
+      nActiveEval--;
+      context.close();
+    } else {
+      state = State.READY;
+      waitForCommands(context);
     }
   }
 
   /**
-   * Return the result including status and message
-   * @param success
-   * @param message
-   * @return
+   * Always close the complete evaluators and
+   * allocate a new evaluator if necessary.
    */
-  private static String getResult(final boolean success, final String message) {
-    final StringBuilder sb = new StringBuilder();
-    final String status = success ? "Success" : "Error";
-    return sb.append("[").append(status).append("] ").append(message).toString();
+  private synchronized void reallocateEvaluator(final ActiveContext context) {
+    nActiveEval--;
+    context.close();
+
+    if (scheduler.hasPendingTasks()) {
+      requestEvaluator(1);
+    } else if (nActiveEval <= 0) {
+      state = State.WAIT_EVALUATORS;
+      requestEvaluator(1);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
new file mode 100644
index 0000000..c2f6090
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Receive HttpRequest so that it can handle the command list
+ */
+final class SchedulerHttpHandler implements HttpHandler {
+  final InjectionFuture<SchedulerDriver> schedulerDriver;
+
+  private String uriSpecification = "reef-example-scheduler";
+
+  @Inject
+  public SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) {
+    this.schedulerDriver = schedulerDriver;
+  }
+
+  @Override
+  public String getUriSpecification() {
+    return uriSpecification;
+  }
+
+  @Override
+  public void setUriSpecification(String s) {
+    uriSpecification = s;
+  }
+
+  /**
+   * HttpRequest handler. You must specify UriSpecification and REST API version.
+   * The request url is http://{address}:{port}/reef-example-scheduler/v1
+   *
+   * APIs
+   *   /list                to get the status list for all tasks
+   *   /status?id={id}      to query the status of such a task, given id
+   *   /submit?cmd={cmd}    to submit a Task, which returns its id
+   *   /cancel?id={id}      to cancel the task's execution
+   *   /num-eval?num={num}  to set the maximum number of evaluators
+   *   /clear               to clear the waiting queue
+   */
+  @Override
+  public void onHttpRequest(ParsedHttpRequest request, HttpServletResponse response)
+    throws IOException, ServletException {
+    final String target = request.getTargetEntity().toLowerCase();
+    final Map<String, List<String>> queryMap = request.getQueryMap();
+
+    final SchedulerResponse result;
+    switch (target) {
+      case "list":
+        result = schedulerDriver.get().getList();
+        break;
+      case "clear":
+        result = schedulerDriver.get().clearList();
+        break;
+      case "status":
+        result = schedulerDriver.get().getTaskStatus(queryMap.get("id"));
+        break;
+      case "submit":
+        result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
+        break;
+      case "cancel":
+        result = schedulerDriver.get().cancelTask(queryMap.get("id"));
+        break;
+      case "max-eval":
+        result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num"));
+        break;
+      default:
+        result = SchedulerResponse.NOT_FOUND("Unsupported operation");
+    }
+
+    // Send response to the http client
+    final int status = result.getStatus();
+    final String message= result.getMessage();
+
+    if (result.isOK()) {
+      response.getOutputStream().println(message);
+    } else {
+      response.sendError(status, message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
index 24f8ed5..2911905 100644
--- a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
@@ -20,20 +20,17 @@ package org.apache.reef.examples.scheduler;
 
 import org.apache.commons.cli.ParseException;
 import org.apache.reef.client.DriverConfiguration;
-import org.apache.reef.client.DriverServiceConfiguration;
 import org.apache.reef.client.REEF;
 import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
-import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.webserver.HttpHandlerConfiguration;
-import org.apache.reef.webserver.ReefEventStateManager;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.JavaConfigurationBuilder;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
 
 import java.io.IOException;
 
@@ -43,7 +40,8 @@ import java.io.IOException;
 public final class SchedulerREEF {
 
   /**
-   * Command line parameter = true to reuse evaluators, or false allocate/close for each iteration
+   * Command line parameter = true to reuse evaluators,
+   * or false to allocate/close for each iteration
    */
   @NamedParameter(doc = "Whether or not to reuse evaluators",
     short_name = "retain", default_value = "true")
@@ -55,10 +53,9 @@ public final class SchedulerREEF {
    */
   private final static Configuration getHttpConf() {
     final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF
-      .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerShellCmdHandler.class)
+      .set(HttpHandlerConfiguration.HTTP_HANDLERS, SchedulerHttpHandler.class)
       .build();
     return httpHandlerConf;
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
index 99a593b..c42cba5 100644
--- a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
@@ -37,8 +37,10 @@ public final class SchedulerREEFYarn {
    * @throws InjectionException
    * @throws java.io.IOException
    */
-  public final static void main(String[] args) throws InjectionException, IOException, ParseException {
-    final Configuration runtimeConfiguration = YarnClientConfiguration.CONF.build();
+  public final static void main(String[] args)
+    throws InjectionException, IOException, ParseException {
+    final Configuration runtimeConfiguration =
+      YarnClientConfiguration.CONF.build();
     runTaskScheduler(runtimeConfiguration, args);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
new file mode 100644
index 0000000..50293e6
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+/**
+ * This class specifies the response from the Scheduler.
+ * It includes the status code and message.
+ */
+final class SchedulerResponse {
+  /**
+   * 200 OK : The request succeeded normally.
+   */
+  private static final int SC_OK = 200;
+
+  /**
+   * 400 BAD REQUEST : The request is syntactically incorrect.
+   */
+  private static final int SC_BAD_REQUEST = 400;
+
+  /**
+   * 403 FORBIDDEN : Syntactically okay but refused to process.
+   */
+  private static final int SC_FORBIDDEN = 403;
+
+  /**
+   * 404 NOT FOUND :  The resource is not available.
+   */
+  private static final int SC_NOT_FOUND = 404;
+
+  /**
+   * Create a response with OK status
+   */
+  public static SchedulerResponse OK(final String message){
+    return new SchedulerResponse (SC_OK, message);
+  }
+
+  /**
+   * Create a response with BAD_REQUEST status
+   */
+  public static SchedulerResponse BAD_REQUEST(final String message){
+    return new SchedulerResponse (SC_BAD_REQUEST, message);
+  }
+
+  /**
+   * Create a response with FORBIDDEN status
+   */
+  public static SchedulerResponse FORBIDDEN(final String message){
+    return new SchedulerResponse (SC_FORBIDDEN, message);
+  }
+
+  /**
+   * Create a response with NOT FOUND status
+   */
+  public static SchedulerResponse NOT_FOUND(final String message){
+    return new SchedulerResponse (SC_NOT_FOUND, message);
+  }
+
+  /**
+   * Return {@code true} if the response is OK.
+   */
+  public boolean isOK(){
+    return this.status == SC_OK;
+  }
+
+  /**
+   * Status code of the request based on RFC 2068.
+   */
+  private int status;
+
+  /**
+   * Message to send.
+   */
+  private String message;
+
+  /**
+   * Constructor using status code and message.
+   * @param status
+   * @param message
+   */
+  private SchedulerResponse(final int status, final String message) {
+    this.status = status;
+    this.message = message;
+  }
+
+  /**
+   * Return the status code of this response.
+   */
+  int getStatus() {
+    return status;
+  }
+
+  /**
+   * Return the message of this response.
+   */
+  String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d2ab481/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
new file mode 100644
index 0000000..fe777ff
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+/**
+ * TaskEntity represent a single entry of task queue used in
+ * scheduler. Since REEF already has the class named {Task},
+ * a different name is used for this class.
+ */
+final class TaskEntity {
+  private final int taskId;
+  private final String command;
+
+  public TaskEntity(final int taskId, final String command) {
+    this.taskId = taskId;
+    this.command = command;
+  }
+
+  /**
+   * Return the TaskID assigned to this Task.
+   */
+  int getId() {
+    return taskId;
+  }
+
+  String getCommand() {
+    return command;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TaskEntity that = (TaskEntity) o;
+
+    if (taskId != that.taskId) return false;
+    if (!command.equals(that.command)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = taskId;
+    result = 31 * result + command.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append("<Id=").append(taskId).
+      append(", Command=").append(command).append(">").toString();
+  }
+}