You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/11/09 17:50:23 UTC

incubator-reef git commit: [REEF-3]: A basic Task Scheduler This implements REEF-3: https://issues.apache.org/jira/browse/REEF-3

Repository: incubator-reef
Updated Branches:
  refs/heads/master 7c734fd32 -> 10f965143


[REEF-3]: A basic Task Scheduler
This implements REEF-3: https://issues.apache.org/jira/browse/REEF-3

It is a basic Task Scheduler example using Reef-webserver. The application
receives the task (shell command) list from user and execute the tasks in a
FIFO order.

Users can send the HTTP request to the server via URL:
 http://{address}:{port}/reef-example-scheduler/v1

Users can send the following requests

  * /list               lists all the tasks' status.
  * /clear              clears all the tasks waiting on the queue. Returns how
                        many tasks are removed.
  * /submit?cmd=COMMAND submits a COMMAND to be executed and return the taskID.
  * /status?id=ID       returns the status of task whose id is ID.
  * /cancel?id=ID       cancels the task whose id is ID.

The results of tasks are written in the log files - both in driver's and
evaluators'.

By default, the application reuses existing evaluator to run Tasks.
Using -retain false option, the driver allocates an evaluator for each command.

The code was contributed by Yunseong Lee <yu...@me.com>

Closes #3


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/10f96514
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/10f96514
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/10f96514

Branch: refs/heads/master
Commit: 10f96514379179b8b2cf11fcf350418787302078
Parents: 7c734fd
Author: Yunseong Lee <yu...@me.com>
Authored: Fri Oct 24 13:27:14 2014 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Sun Nov 9 08:25:25 2014 -0800

----------------------------------------------------------------------
 .../scheduler/HttpServerShellCmdHandler.java    |  99 +++++
 .../examples/scheduler/SchedulerDriver.java     | 382 +++++++++++++++++++
 .../reef/examples/scheduler/SchedulerREEF.java  | 112 ++++++
 .../examples/scheduler/SchedulerREEFYarn.java   |  44 +++
 .../reef/examples/scheduler/package-info.java   |  22 ++
 5 files changed, 659 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
new file mode 100644
index 0000000..4320bcd
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Receive HttpRequest so that it can handle the command list
+ */
+public class HttpServerShellCmdHandler implements HttpHandler {
+  final InjectionFuture<SchedulerDriver> schedulerDriver;
+
+  private String uriSpecification = "reef-example-scheduler";
+
+  @Inject
+  public HttpServerShellCmdHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) {
+    this.schedulerDriver = schedulerDriver;
+  }
+
+  @Override
+  public String getUriSpecification() {
+    return uriSpecification;
+  }
+
+  @Override
+  public void setUriSpecification(String s) {
+    uriSpecification = s;
+  }
+
+  /**
+   * HttpRequest handler. You must specify UriSpecification and REST API version.
+   * The request url is http://{address}:{port}/reef-example-scheduler/v1
+   *
+   * APIs
+   *   /list              to get the status list for all tasks
+   *   /status?id={id}    to query the status of such a task, given id
+   *   /submit?cmd={cmd}  to submit a Task, which returns its id
+   *   /cancel?id={id}    to cancel the task's execution
+   *   /clear             to clear the waiting queue
+   */
+  @Override
+  public void onHttpRequest(ParsedHttpRequest request, HttpServletResponse response) throws IOException, ServletException {
+    final String target = request.getTargetEntity().toLowerCase();
+    final Map<String, List<String>> queryMap = request.getQueryMap();
+
+    final String result;
+    switch (target) {
+      case "list":
+        result = schedulerDriver.get().getList();
+        break;
+      case "clear":
+        result = schedulerDriver.get().clearList();
+        break;
+      case "status":
+        result = schedulerDriver.get().getStatus(queryMap.get("id"));
+        break;
+      case "submit":
+        result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
+        break;
+      case "cancel":
+        result = schedulerDriver.get().cancelTask(queryMap.get("id"));
+        break;
+      default:
+        result = "Unsupported operation";
+    }
+
+    // Send response to the http client
+    response.getOutputStream().println(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
new file mode 100644
index 0000000..1f70e8d
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.io.network.util.Pair;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver for TaskScheduler. It receives the commands by HttpRequest and
+ * execute them in a FIFO(First In First Out) order.
+ */
+@Unit
+public final class SchedulerDriver {
+
+  public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+  private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName());
+
+  /**
+   * Possible states of the job driver. Can be one of:
+   * <dl>
+   * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator</dd>
+   * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to be ready.</dd>
+   * <du><code>READY</code></du><dd>Wait for the commands. When new Tasks arrive, enqueue the tasks and transit to RUNNING status.</dd>
+   * <du><code>RUNNING</code></du><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd>
+   * </dl>
+   */
+  private enum State {
+    INIT, WAIT_EVALUATOR, READY, RUNNING
+  }
+
+  /**
+   * If true, it reuses evaluators when Tasks done.
+   */
+  private boolean retainable;
+
+  private Object lock = new Object();
+
+  @GuardedBy("lock")
+  private State state = State.INIT;
+
+  @GuardedBy("lock")
+  private final Queue<Pair<Integer, String>> taskQueue;
+
+  @GuardedBy("lock")
+  private Integer runningTaskId = null;
+
+  @GuardedBy("lock")
+  private Set<Integer> finishedTaskId = new HashSet<>();
+
+  @GuardedBy("lock")
+  private Set<Integer> canceledTaskId = new HashSet<>();
+
+  private final EvaluatorRequestor requestor;
+
+
+  /**
+   * Counts how many tasks have been scheduled.
+   */
+  private AtomicInteger taskCount = new AtomicInteger(0);
+
+  @Inject
+  public SchedulerDriver(final EvaluatorRequestor requestor,
+                         @Parameter(SchedulerREEF.Retain.class) boolean retainable) {
+    this.requestor = requestor;
+    this.taskQueue = new LinkedList<>();
+    this.retainable = retainable;
+  }
+
+  /**
+   * The driver is ready to run.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      LOG.log(Level.INFO, "Driver started at {0}", startTime);
+      assert (state == State.INIT);
+
+      requestEvaluator();
+    }
+  }
+
+  /**
+   * Evaluator is allocated. This occurs every time to run commands in Non-retainable version,
+   * while occurs only once in the Retainable version
+   */
+  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator evaluator) {
+      LOG.log(Level.INFO, "Evaluator is ready");
+      assert (state == State.WAIT_EVALUATOR);
+
+      evaluator.submitContext(ContextConfiguration.CONF
+        .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
+        .build());
+    }
+  }
+
+  /**
+   * Now it is ready to schedule tasks. But if the queue is empty,
+   * wait until commands coming up.
+   */
+  final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(ActiveContext context) {
+      synchronized (lock) {
+        LOG.log(Level.INFO, "Context available : {0}", context.getId());
+        assert (state == State.WAIT_EVALUATOR);
+
+        state = State.READY;
+        waitForCommands(context);
+      }
+    }
+  }
+
+  /**
+   * Get the list of Tasks. They are classified as their states.
+   * @return
+   */
+  public String getList() {
+    synchronized (lock) {
+      final StringBuilder sb = new StringBuilder("Running : ");
+      if (runningTaskId != null) {
+        sb.append(runningTaskId);
+      }
+
+      sb.append("\nWaiting :");
+      for (final Pair<Integer, String> entity : taskQueue) {
+        sb.append(" ").append(entity.first);
+      }
+
+      sb.append("\nFinished :");
+      for (final int taskIds : finishedTaskId) {
+        sb.append(" ").append(taskIds);
+      }
+
+      sb.append("\nCanceled :");
+      for (final int taskIds : canceledTaskId) {
+        sb.append(" ").append(taskIds);
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Get the status of a Task.
+   * @return
+   */
+  public String getStatus(final List<String> args) {
+    if (args.size() != 1) {
+      return getResult(false, "Usage : only one ID at a time");
+    }
+
+    final Integer taskId = Integer.valueOf(args.get(0));
+
+    synchronized (lock) {
+      if (taskId.equals(runningTaskId)) {
+        return getResult(true, "Running");
+      } else if (finishedTaskId.contains(taskId)) {
+        return getResult(true, "Finished");
+      } else if (canceledTaskId.contains(taskId)) {
+        return getResult(true, "Canceled");
+      }
+
+      for (final Pair<Integer, String> entity : taskQueue) {
+        if (taskId == entity.first) {
+          return getResult(true, "Waiting");
+        }
+      }
+      return getResult(false, "Not found");
+    }
+  }
+
+  /**
+   * Submit a command to schedule.
+   * @return
+   */
+  public String submitCommands(final List<String> args) {
+    if (args.size() != 1) {
+      return getResult(false, "Usage : only one ID at a time");
+    }
+
+    final String command = args.get(0);
+
+    synchronized (lock) {
+      final Integer id = taskCount.incrementAndGet();
+      taskQueue.add(new Pair(id, command));
+
+      if (readyToRun()) {
+        state = State.RUNNING;
+        lock.notify();
+      }
+      return getResult(true, "Task ID : "+id);
+    }
+  }
+
+  /**
+   * Cancel a Task waiting on the queue. A task cannot be canceled
+   * once it is running.
+   * @return
+   */
+  public String cancelTask(final List<String> args) {
+    if (args.size() != 1) {
+      return getResult(false, "Usage : only one ID at a time");
+    }
+
+    final Integer taskId = Integer.valueOf(args.get(0));
+
+    synchronized (lock) {
+      if (taskId.equals(runningTaskId)) {
+        return getResult(false, "The task is running");
+      } else if (finishedTaskId.contains(taskId)) {
+        return getResult(false, "Already finished");
+      }
+
+      for (final Pair<Integer, String> entity : taskQueue) {
+        if (taskId == entity.first) {
+          taskQueue.remove(entity);
+          canceledTaskId.add(taskId);
+          return getResult(true, "Canceled");
+        }
+      }
+      return getResult(false, "Not found");
+    }
+  }
+
+  /**
+   * Clear all the Tasks from the waiting queue.
+   * @return
+   */
+  public String clearList() {
+    final int count;
+    synchronized (lock) {
+      count = taskQueue.size();
+      for (Pair<Integer, String> entity : taskQueue) {
+        canceledTaskId.add(entity.first);
+      }
+      taskQueue.clear();
+    }
+    return getResult(true, count + " tasks removed.");
+  }
+
+
+
+  /**
+   * Non-retainable version of CompletedTaskHandler.
+   * When Task completes, it closes the active context to deallocate the evaluator
+   * and if there is outstanding commands, allocate another evaluator.
+   */
+  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask task) {
+      synchronized (lock) {
+        finishedTaskId.add(runningTaskId);
+        runningTaskId = null;
+
+        LOG.log(Level.INFO, "Task completed. Reuse the evaluator :", String.valueOf(retainable));
+
+        if (retainable) {
+          if (taskQueue.isEmpty()) {
+            state = State.READY;
+          }
+          waitForCommands(task.getActiveContext());
+        } else {
+          task.getActiveContext().close();
+          state = State.WAIT_EVALUATOR;
+          requestEvaluator();
+        }
+      }
+    }
+  }
+
+  /**
+   * Request an evaluator
+   */
+  private synchronized void requestEvaluator() {
+    requestor.submit(EvaluatorRequest.newBuilder()
+      .setMemory(128)
+      .setNumber(1)
+      .build());
+  }
+
+  /**
+   * @param command The command to execute
+   */
+  private void submit(final ActiveContext context, final Integer taskId, final String command) {
+    final Configuration taskConf = TaskConfiguration.CONF
+      .set(TaskConfiguration.TASK, ShellTask.class)
+      .set(TaskConfiguration.IDENTIFIER, "ShellTask"+taskId)
+      .build();
+    final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder()
+      .bindNamedParameter(Command.class, command)
+      .build();
+
+    LOG.log(Level.INFO, "Submitting command : {0}", command);
+    final Configuration merged = Configurations.merge(taskConf, commandConf);
+    context.submitTask(merged);
+  }
+
+  /**
+   * Pick up a command from the queue and run it. Wait until
+   * any command coming up if no command exists.
+   * @param context
+   */
+  private void waitForCommands(final ActiveContext context) {
+    synchronized (lock) {
+      while (taskQueue.isEmpty()) {
+        // Wait until any commands enter in the queue
+        try {
+          lock.wait();
+        } catch (InterruptedException e) {
+          LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e);
+        }
+      }
+
+      // Run the first command from the queue.
+      final Pair<Integer, String> task = taskQueue.poll();
+      runningTaskId = task.first;
+      final String command = task.second;
+      submit(context, runningTaskId, command);
+    }
+  }
+
+  /**
+   * @return {@code true} if it is possible to run commands.
+   */
+  private boolean readyToRun() {
+    synchronized (lock) {
+      return state == State.READY && taskQueue.size() > 0;
+    }
+  }
+
+  /**
+   * Return the result including status and message
+   * @param success
+   * @param message
+   * @return
+   */
+  private static String getResult(final boolean success, final String message) {
+    final StringBuilder sb = new StringBuilder();
+    final String status = success ? "Success" : "Error";
+    return sb.append("[").append(status).append("] ").append(message).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
new file mode 100644
index 0000000..24f8ed5
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverServiceConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
+import org.apache.reef.webserver.ReefEventStateManager;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+
+import java.io.IOException;
+
+/**
+ * REEF TaskScheduler.
+ */
+public final class SchedulerREEF {
+
+  /**
+   * Command line parameter = true to reuse evaluators, or false allocate/close for each iteration
+   */
+  @NamedParameter(doc = "Whether or not to reuse evaluators",
+    short_name = "retain", default_value = "true")
+  public static final class Retain implements Name<Boolean> {
+  }
+
+  /**
+   * @return The http configuration to use reef-webserver
+   */
+  private final static Configuration getHttpConf() {
+    final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF
+      .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerShellCmdHandler.class)
+      .build();
+    return httpHandlerConf;
+
+  }
+
+  /**
+   * @return The Driver configuration.
+   */
+  private final static Configuration getDriverConf() {
+    final Configuration driverConf = DriverConfiguration.CONF
+      .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SchedulerDriver.class))
+      .set(DriverConfiguration.DRIVER_IDENTIFIER, "TaskScheduler")
+      .set(DriverConfiguration.ON_DRIVER_STARTED, SchedulerDriver.StartHandler.class)
+      .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SchedulerDriver.EvaluatorAllocatedHandler.class)
+      .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SchedulerDriver.ActiveContextHandler.class)
+      .set(DriverConfiguration.ON_TASK_COMPLETED, SchedulerDriver.CompletedTaskHandler.class)
+      .build();
+
+    return driverConf;
+  }
+
+  /**
+   * Run the Task scheduler. If '-retain true' option is passed via command line,
+   * the scheduler reuses evaluators to submit new Tasks.
+   * @param runtimeConf The runtime configuration (e.g. Local, YARN, etc)
+   * @param args Command line arguments.
+   * @throws InjectionException
+   * @throws java.io.IOException
+   */
+  public static void runTaskScheduler(final Configuration runtimeConf, final String[] args)
+    throws InjectionException, IOException, ParseException {
+    final Tang tang = Tang.Factory.getTang();
+
+    final Configuration commandLineConf = CommandLine.parseToConfiguration(args, Retain.class);
+
+    // Merge the configurations to run Driver
+    final Configuration driverConf = Configurations.merge(getDriverConf(), getHttpConf(), commandLineConf);
+
+    final REEF reef = tang.newInjector(runtimeConf).getInstance(REEF.class);
+    reef.submit(driverConf);
+  }
+
+  /**
+   * Main program
+   * @param args
+   * @throws InjectionException
+   */
+  public final static void main(String[] args) throws InjectionException, IOException, ParseException {
+    final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+      .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 3)
+      .build();
+    runTaskScheduler(runtimeConfiguration, args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
new file mode 100644
index 0000000..99a593b
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+
+import static org.apache.reef.examples.scheduler.SchedulerREEF.runTaskScheduler;
+
+/**
+ * REEF TaskScheduler on YARN runtime.
+ */
+public final class SchedulerREEFYarn {
+  /**
+   * Launch the scheduler with YARN client configuration
+   * @param args
+   * @throws InjectionException
+   * @throws java.io.IOException
+   */
+  public final static void main(String[] args) throws InjectionException, IOException, ParseException {
+    final Configuration runtimeConfiguration = YarnClientConfiguration.CONF.build();
+    runTaskScheduler(runtimeConfiguration, args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
new file mode 100644
index 0000000..728806c
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task scheduler example based on reef-webserver
+ */
+package org.apache.reef.examples.scheduler;
\ No newline at end of file