You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/11/09 17:50:23 UTC
incubator-reef git commit: [REEF-3]: A basic Task Scheduler This
implements REEF-3: https://issues.apache.org/jira/browse/REEF-3
Repository: incubator-reef
Updated Branches:
refs/heads/master 7c734fd32 -> 10f965143
[REEF-3]: A basic Task Scheduler
This implements REEF-3: https://issues.apache.org/jira/browse/REEF-3
It is a basic Task Scheduler example using Reef-webserver. The application
receives the task (shell command) list from user and execute the tasks in a
FIFO order.
Users can send the HTTP request to the server via URL:
http://{address}:{port}/reef-example-scheduler/v1
Users can send the following requests
* /list lists all the tasks' status.
* /clear clears all the tasks waiting on the queue. Returns how
many tasks are removed.
* /submit?cmd=COMMAND submits a COMMAND to be executed and return the taskID.
* /status?id=ID returns the status of task whose id is ID.
* /cancel?id=ID cancels the task whose id is ID.
The results of tasks are written in the log files - both in driver's and
evaluators'.
By default, the application reuses existing evaluator to run Tasks.
Using -retain false option, the driver allocates an evaluator for each command.
The code was contributed by Yunseong Lee <yu...@me.com>
Closes #3
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/10f96514
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/10f96514
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/10f96514
Branch: refs/heads/master
Commit: 10f96514379179b8b2cf11fcf350418787302078
Parents: 7c734fd
Author: Yunseong Lee <yu...@me.com>
Authored: Fri Oct 24 13:27:14 2014 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Sun Nov 9 08:25:25 2014 -0800
----------------------------------------------------------------------
.../scheduler/HttpServerShellCmdHandler.java | 99 +++++
.../examples/scheduler/SchedulerDriver.java | 382 +++++++++++++++++++
.../reef/examples/scheduler/SchedulerREEF.java | 112 ++++++
.../examples/scheduler/SchedulerREEFYarn.java | 44 +++
.../reef/examples/scheduler/package-info.java | 22 ++
5 files changed, 659 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
new file mode 100644
index 0000000..4320bcd
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/HttpServerShellCmdHandler.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Receive HttpRequest so that it can handle the command list
+ */
+public class HttpServerShellCmdHandler implements HttpHandler {
+ final InjectionFuture<SchedulerDriver> schedulerDriver;
+
+ private String uriSpecification = "reef-example-scheduler";
+
+ @Inject
+ public HttpServerShellCmdHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) {
+ this.schedulerDriver = schedulerDriver;
+ }
+
+ @Override
+ public String getUriSpecification() {
+ return uriSpecification;
+ }
+
+ @Override
+ public void setUriSpecification(String s) {
+ uriSpecification = s;
+ }
+
+ /**
+ * HttpRequest handler. You must specify UriSpecification and REST API version.
+ * The request url is http://{address}:{port}/reef-example-scheduler/v1
+ *
+ * APIs
+ * /list to get the status list for all tasks
+ * /status?id={id} to query the status of such a task, given id
+ * /submit?cmd={cmd} to submit a Task, which returns its id
+ * /cancel?id={id} to cancel the task's execution
+ * /clear to clear the waiting queue
+ */
+ @Override
+ public void onHttpRequest(ParsedHttpRequest request, HttpServletResponse response) throws IOException, ServletException {
+ final String target = request.getTargetEntity().toLowerCase();
+ final Map<String, List<String>> queryMap = request.getQueryMap();
+
+ final String result;
+ switch (target) {
+ case "list":
+ result = schedulerDriver.get().getList();
+ break;
+ case "clear":
+ result = schedulerDriver.get().clearList();
+ break;
+ case "status":
+ result = schedulerDriver.get().getStatus(queryMap.get("id"));
+ break;
+ case "submit":
+ result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
+ break;
+ case "cancel":
+ result = schedulerDriver.get().cancelTask(queryMap.get("id"));
+ break;
+ default:
+ result = "Unsupported operation";
+ }
+
+ // Send response to the http client
+ response.getOutputStream().println(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
new file mode 100644
index 0000000..1f70e8d
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.io.network.util.Pair;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver for TaskScheduler. It receives the commands by HttpRequest and
+ * execute them in a FIFO(First In First Out) order.
+ */
+@Unit
+public final class SchedulerDriver {
+
+ public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+ private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName());
+
+ /**
+ * Possible states of the job driver. Can be one of:
+ * <dl>
+ * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator</dd>
+ * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to be ready.</dd>
+ * <du><code>READY</code></du><dd>Wait for the commands. When new Tasks arrive, enqueue the tasks and transit to RUNNING status.</dd>
+ * <du><code>RUNNING</code></du><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd>
+ * </dl>
+ */
+ private enum State {
+ INIT, WAIT_EVALUATOR, READY, RUNNING
+ }
+
+ /**
+ * If true, it reuses evaluators when Tasks done.
+ */
+ private boolean retainable;
+
+ private Object lock = new Object();
+
+ @GuardedBy("lock")
+ private State state = State.INIT;
+
+ @GuardedBy("lock")
+ private final Queue<Pair<Integer, String>> taskQueue;
+
+ @GuardedBy("lock")
+ private Integer runningTaskId = null;
+
+ @GuardedBy("lock")
+ private Set<Integer> finishedTaskId = new HashSet<>();
+
+ @GuardedBy("lock")
+ private Set<Integer> canceledTaskId = new HashSet<>();
+
+ private final EvaluatorRequestor requestor;
+
+
+ /**
+ * Counts how many tasks have been scheduled.
+ */
+ private AtomicInteger taskCount = new AtomicInteger(0);
+
+ @Inject
+ public SchedulerDriver(final EvaluatorRequestor requestor,
+ @Parameter(SchedulerREEF.Retain.class) boolean retainable) {
+ this.requestor = requestor;
+ this.taskQueue = new LinkedList<>();
+ this.retainable = retainable;
+ }
+
+ /**
+ * The driver is ready to run.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ LOG.log(Level.INFO, "Driver started at {0}", startTime);
+ assert (state == State.INIT);
+
+ requestEvaluator();
+ }
+ }
+
+ /**
+ * Evaluator is allocated. This occurs every time to run commands in Non-retainable version,
+ * while occurs only once in the Retainable version
+ */
+ final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator evaluator) {
+ LOG.log(Level.INFO, "Evaluator is ready");
+ assert (state == State.WAIT_EVALUATOR);
+
+ evaluator.submitContext(ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
+ .build());
+ }
+ }
+
+ /**
+ * Now it is ready to schedule tasks. But if the queue is empty,
+ * wait until commands coming up.
+ */
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(ActiveContext context) {
+ synchronized (lock) {
+ LOG.log(Level.INFO, "Context available : {0}", context.getId());
+ assert (state == State.WAIT_EVALUATOR);
+
+ state = State.READY;
+ waitForCommands(context);
+ }
+ }
+ }
+
+ /**
+ * Get the list of Tasks. They are classified as their states.
+ * @return
+ */
+ public String getList() {
+ synchronized (lock) {
+ final StringBuilder sb = new StringBuilder("Running : ");
+ if (runningTaskId != null) {
+ sb.append(runningTaskId);
+ }
+
+ sb.append("\nWaiting :");
+ for (final Pair<Integer, String> entity : taskQueue) {
+ sb.append(" ").append(entity.first);
+ }
+
+ sb.append("\nFinished :");
+ for (final int taskIds : finishedTaskId) {
+ sb.append(" ").append(taskIds);
+ }
+
+ sb.append("\nCanceled :");
+ for (final int taskIds : canceledTaskId) {
+ sb.append(" ").append(taskIds);
+ }
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Get the status of a Task.
+ * @return
+ */
+ public String getStatus(final List<String> args) {
+ if (args.size() != 1) {
+ return getResult(false, "Usage : only one ID at a time");
+ }
+
+ final Integer taskId = Integer.valueOf(args.get(0));
+
+ synchronized (lock) {
+ if (taskId.equals(runningTaskId)) {
+ return getResult(true, "Running");
+ } else if (finishedTaskId.contains(taskId)) {
+ return getResult(true, "Finished");
+ } else if (canceledTaskId.contains(taskId)) {
+ return getResult(true, "Canceled");
+ }
+
+ for (final Pair<Integer, String> entity : taskQueue) {
+ if (taskId == entity.first) {
+ return getResult(true, "Waiting");
+ }
+ }
+ return getResult(false, "Not found");
+ }
+ }
+
+ /**
+ * Submit a command to schedule.
+ * @return
+ */
+ public String submitCommands(final List<String> args) {
+ if (args.size() != 1) {
+ return getResult(false, "Usage : only one ID at a time");
+ }
+
+ final String command = args.get(0);
+
+ synchronized (lock) {
+ final Integer id = taskCount.incrementAndGet();
+ taskQueue.add(new Pair(id, command));
+
+ if (readyToRun()) {
+ state = State.RUNNING;
+ lock.notify();
+ }
+ return getResult(true, "Task ID : "+id);
+ }
+ }
+
+ /**
+ * Cancel a Task waiting on the queue. A task cannot be canceled
+ * once it is running.
+ * @return
+ */
+ public String cancelTask(final List<String> args) {
+ if (args.size() != 1) {
+ return getResult(false, "Usage : only one ID at a time");
+ }
+
+ final Integer taskId = Integer.valueOf(args.get(0));
+
+ synchronized (lock) {
+ if (taskId.equals(runningTaskId)) {
+ return getResult(false, "The task is running");
+ } else if (finishedTaskId.contains(taskId)) {
+ return getResult(false, "Already finished");
+ }
+
+ for (final Pair<Integer, String> entity : taskQueue) {
+ if (taskId == entity.first) {
+ taskQueue.remove(entity);
+ canceledTaskId.add(taskId);
+ return getResult(true, "Canceled");
+ }
+ }
+ return getResult(false, "Not found");
+ }
+ }
+
+ /**
+ * Clear all the Tasks from the waiting queue.
+ * @return
+ */
+ public String clearList() {
+ final int count;
+ synchronized (lock) {
+ count = taskQueue.size();
+ for (Pair<Integer, String> entity : taskQueue) {
+ canceledTaskId.add(entity.first);
+ }
+ taskQueue.clear();
+ }
+ return getResult(true, count + " tasks removed.");
+ }
+
+
+
+ /**
+ * Non-retainable version of CompletedTaskHandler.
+ * When Task completes, it closes the active context to deallocate the evaluator
+ * and if there is outstanding commands, allocate another evaluator.
+ */
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask task) {
+ synchronized (lock) {
+ finishedTaskId.add(runningTaskId);
+ runningTaskId = null;
+
+ LOG.log(Level.INFO, "Task completed. Reuse the evaluator :", String.valueOf(retainable));
+
+ if (retainable) {
+ if (taskQueue.isEmpty()) {
+ state = State.READY;
+ }
+ waitForCommands(task.getActiveContext());
+ } else {
+ task.getActiveContext().close();
+ state = State.WAIT_EVALUATOR;
+ requestEvaluator();
+ }
+ }
+ }
+ }
+
+ /**
+ * Request an evaluator
+ */
+ private synchronized void requestEvaluator() {
+ requestor.submit(EvaluatorRequest.newBuilder()
+ .setMemory(128)
+ .setNumber(1)
+ .build());
+ }
+
+ /**
+ * @param command The command to execute
+ */
+ private void submit(final ActiveContext context, final Integer taskId, final String command) {
+ final Configuration taskConf = TaskConfiguration.CONF
+ .set(TaskConfiguration.TASK, ShellTask.class)
+ .set(TaskConfiguration.IDENTIFIER, "ShellTask"+taskId)
+ .build();
+ final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(Command.class, command)
+ .build();
+
+ LOG.log(Level.INFO, "Submitting command : {0}", command);
+ final Configuration merged = Configurations.merge(taskConf, commandConf);
+ context.submitTask(merged);
+ }
+
+ /**
+ * Pick up a command from the queue and run it. Wait until
+ * any command coming up if no command exists.
+ * @param context
+ */
+ private void waitForCommands(final ActiveContext context) {
+ synchronized (lock) {
+ while (taskQueue.isEmpty()) {
+ // Wait until any commands enter in the queue
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e);
+ }
+ }
+
+ // Run the first command from the queue.
+ final Pair<Integer, String> task = taskQueue.poll();
+ runningTaskId = task.first;
+ final String command = task.second;
+ submit(context, runningTaskId, command);
+ }
+ }
+
+ /**
+ * @return {@code true} if it is possible to run commands.
+ */
+ private boolean readyToRun() {
+ synchronized (lock) {
+ return state == State.READY && taskQueue.size() > 0;
+ }
+ }
+
+ /**
+ * Return the result including status and message
+ * @param success
+ * @param message
+ * @return
+ */
+ private static String getResult(final boolean success, final String message) {
+ final StringBuilder sb = new StringBuilder();
+ final String status = success ? "Success" : "Error";
+ return sb.append("[").append(status).append("] ").append(message).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
new file mode 100644
index 0000000..24f8ed5
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverServiceConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
+import org.apache.reef.webserver.ReefEventStateManager;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+
+import java.io.IOException;
+
+/**
+ * REEF TaskScheduler.
+ */
+public final class SchedulerREEF {
+
+ /**
+ * Command line parameter = true to reuse evaluators, or false allocate/close for each iteration
+ */
+ @NamedParameter(doc = "Whether or not to reuse evaluators",
+ short_name = "retain", default_value = "true")
+ public static final class Retain implements Name<Boolean> {
+ }
+
+ /**
+ * @return The http configuration to use reef-webserver
+ */
+ private final static Configuration getHttpConf() {
+ final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF
+ .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerShellCmdHandler.class)
+ .build();
+ return httpHandlerConf;
+
+ }
+
+ /**
+ * @return The Driver configuration.
+ */
+ private final static Configuration getDriverConf() {
+ final Configuration driverConf = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SchedulerDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "TaskScheduler")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, SchedulerDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SchedulerDriver.EvaluatorAllocatedHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SchedulerDriver.ActiveContextHandler.class)
+ .set(DriverConfiguration.ON_TASK_COMPLETED, SchedulerDriver.CompletedTaskHandler.class)
+ .build();
+
+ return driverConf;
+ }
+
+ /**
+ * Run the Task scheduler. If '-retain true' option is passed via command line,
+ * the scheduler reuses evaluators to submit new Tasks.
+ * @param runtimeConf The runtime configuration (e.g. Local, YARN, etc)
+ * @param args Command line arguments.
+ * @throws InjectionException
+ * @throws java.io.IOException
+ */
+ public static void runTaskScheduler(final Configuration runtimeConf, final String[] args)
+ throws InjectionException, IOException, ParseException {
+ final Tang tang = Tang.Factory.getTang();
+
+ final Configuration commandLineConf = CommandLine.parseToConfiguration(args, Retain.class);
+
+ // Merge the configurations to run Driver
+ final Configuration driverConf = Configurations.merge(getDriverConf(), getHttpConf(), commandLineConf);
+
+ final REEF reef = tang.newInjector(runtimeConf).getInstance(REEF.class);
+ reef.submit(driverConf);
+ }
+
+ /**
+ * Main program
+ * @param args
+ * @throws InjectionException
+ */
+ public final static void main(String[] args) throws InjectionException, IOException, ParseException {
+ final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 3)
+ .build();
+ runTaskScheduler(runtimeConfiguration, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
new file mode 100644
index 0000000..99a593b
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+
+import static org.apache.reef.examples.scheduler.SchedulerREEF.runTaskScheduler;
+
+/**
+ * REEF TaskScheduler on YARN runtime.
+ */
+public final class SchedulerREEFYarn {
+ /**
+ * Launch the scheduler with YARN client configuration
+ * @param args
+ * @throws InjectionException
+ * @throws java.io.IOException
+ */
+ public final static void main(String[] args) throws InjectionException, IOException, ParseException {
+ final Configuration runtimeConfiguration = YarnClientConfiguration.CONF.build();
+ runTaskScheduler(runtimeConfiguration, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/10f96514/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
new file mode 100644
index 0000000..728806c
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task scheduler example based on reef-webserver
+ */
+package org.apache.reef.examples.scheduler;
\ No newline at end of file