You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:03 UTC

[30/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/JobDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/JobDriver.java
new file mode 100644
index 0000000..653b240
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/JobDriver.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.pool;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Allocate N evaluators, submit M tasks to them, and measure the time.
+ * Each task does nothing but sleeps for D seconds.
+ */
+@Unit
+public final class JobDriver {
+
+  /**
+   * Standard Java logger.
+   */
+  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
+
+  /**
+   * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
+   */
+  private final EvaluatorRequestor evaluatorRequestor;
+
+  /**
+   * If true, submit context and task in one request.
+   */
+  private final boolean isPiggyback;
+
+  /**
+   * Number of Evaluators to request.
+   */
+  private final int numEvaluators;
+
+  /**
+   * Number of Tasks to run.
+   */
+  private final int numTasks;
+  /**
+   * Number of seconds to sleep in each Task.
+   * (has to be a String to pass it into Task config).
+   */
+  private final String delayStr;
+  /**
+   * Number of Evaluators started.
+   */
+  private int numEvaluatorsStarted = 0;
+  /**
+   * Number of Tasks launched.
+   */
+  private int numTasksStarted = 0;
+
+  /**
+   * Job driver constructor.
+   * All parameters are injected from TANG automatically.
+   *
+   * @param evaluatorRequestor is used to request Evaluators.
+   */
+  @Inject
+  JobDriver(final EvaluatorRequestor evaluatorRequestor,
+            final @Parameter(Launch.Piggyback.class) Boolean isPiggyback,
+            final @Parameter(Launch.NumEvaluators.class) Integer numEvaluators,
+            final @Parameter(Launch.NumTasks.class) Integer numTasks,
+            final @Parameter(Launch.Delay.class) Integer delay) {
+    this.evaluatorRequestor = evaluatorRequestor;
+    this.isPiggyback = isPiggyback;
+    this.numEvaluators = numEvaluators;
+    this.numTasks = numTasks;
+    this.delayStr = "" + delay;
+  }
+
+  /**
+   * Build a new Task configuration for a given task ID.
+   *
+   * @param taskId Unique string ID of the task
+   * @return Immutable task configuration object, ready to be submitted to REEF.
+   * @throws RuntimeException that wraps BindException if unable to build the configuration.
+   */
+  private Configuration getTaskConfiguration(final String taskId) {
+    try {
+      return TaskConfiguration.CONF
+          .set(TaskConfiguration.IDENTIFIER, taskId)
+          .set(TaskConfiguration.TASK, SleepTask.class)
+          .build();
+    } catch (final BindException ex) {
+      LOG.log(Level.SEVERE, "Failed to create  Task Configuration: " + taskId, ex);
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * Job Driver is ready and the clock is set up: request the evaluators.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      LOG.log(Level.INFO, "TIME: Start Driver with {0} Evaluators", numEvaluators);
+      evaluatorRequestor.submit(
+          EvaluatorRequest.newBuilder()
+              .setMemory(128)
+              .setNumberOfCores(1)
+              .setNumber(numEvaluators).build()
+      );
+    }
+  }
+
+  /**
+   * Job Driver is is shutting down: write to the log.
+   */
+  final class StopHandler implements EventHandler<StopTime> {
+    @Override
+    public void onNext(final StopTime stopTime) {
+      LOG.log(Level.INFO, "TIME: Stop Driver");
+    }
+  }
+
+  /**
+   * Receive notification that an Evaluator had been allocated,
+   * and submitTask a new Task in that Evaluator.
+   */
+  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator eval) {
+
+      LOG.log(Level.INFO, "TIME: Allocated Evaluator {0}", eval.getId());
+
+      final boolean runTask;
+      final int nEval;
+      final int nTask;
+
+      synchronized (JobDriver.this) {
+        runTask = numTasksStarted < numTasks;
+        if (runTask) {
+          ++numEvaluatorsStarted;
+          if (isPiggyback) {
+            ++numTasksStarted;
+          }
+        }
+        nEval = numEvaluatorsStarted;
+        nTask = numTasksStarted;
+      }
+
+      if (runTask) {
+
+        final String contextId = String.format("Context_%06d", nEval);
+        LOG.log(Level.INFO, "TIME: Submit Context {0} to Evaluator {1}",
+            new Object[]{contextId, eval.getId()});
+
+        try {
+
+          final JavaConfigurationBuilder contextConfigBuilder =
+              Tang.Factory.getTang().newConfigurationBuilder();
+
+          contextConfigBuilder.addConfiguration(ContextConfiguration.CONF
+              .set(ContextConfiguration.IDENTIFIER, contextId)
+              .build());
+
+          contextConfigBuilder.bindNamedParameter(Launch.Delay.class, delayStr);
+
+          if (isPiggyback) {
+
+            final String taskId = String.format("StartTask_%08d", nTask);
+            final Configuration taskConfig = getTaskConfiguration(taskId);
+
+            LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
+                new Object[]{taskId, eval.getId()});
+
+            eval.submitContextAndTask(contextConfigBuilder.build(), taskConfig);
+
+          } else {
+            eval.submitContext(contextConfigBuilder.build());
+          }
+
+        } catch (final BindException ex) {
+          LOG.log(Level.SEVERE, "Failed to submit Context to Evaluator: " + eval.getId(), ex);
+          throw new RuntimeException(ex);
+        }
+      } else {
+        LOG.log(Level.INFO, "TIME: Close Evaluator {0}", eval.getId());
+        eval.close();
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Context is active.
+   */
+  final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext context) {
+
+      LOG.log(Level.INFO, "TIME: Active Context {0}", context.getId());
+
+      if (isPiggyback) return; // Task already submitted
+
+      final boolean runTask;
+      final int nTask;
+
+      synchronized (JobDriver.this) {
+        runTask = numTasksStarted < numTasks;
+        if (runTask) {
+          ++numTasksStarted;
+        }
+        nTask = numTasksStarted;
+      }
+
+      if (runTask) {
+        final String taskId = String.format("StartTask_%08d", nTask);
+        LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
+            new Object[]{taskId, context.getEvaluatorId()});
+        context.submitTask(getTaskConfiguration(taskId));
+      } else {
+        context.close();
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Task is running.
+   */
+  final class RunningTaskHandler implements EventHandler<RunningTask> {
+    @Override
+    public void onNext(final RunningTask task) {
+      LOG.log(Level.INFO, "TIME: Running Task {0}", task.getId());
+    }
+  }
+
+  /**
+   * Receive notification that the Task has completed successfully.
+   */
+  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask task) {
+
+      final ActiveContext context = task.getActiveContext();
+      LOG.log(Level.INFO, "TIME: Completed Task {0} on Evaluator {1}",
+          new Object[]{task.getId(), context.getEvaluatorId()});
+
+      final boolean runTask;
+      final int nTask;
+      synchronized (JobDriver.this) {
+        runTask = numTasksStarted < numTasks;
+        if (runTask) {
+          ++numTasksStarted;
+        }
+        nTask = numTasksStarted;
+      }
+
+      if (runTask) {
+        final String taskId = String.format("Task_%08d", nTask);
+        LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
+            new Object[]{taskId, context.getEvaluatorId()});
+        context.submitTask(getTaskConfiguration(taskId));
+      } else {
+        LOG.log(Level.INFO, "TIME: Close Evaluator {0}", context.getEvaluatorId());
+        context.close();
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Evaluator has been shut down.
+   */
+  final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
+    @Override
+    public void onNext(final CompletedEvaluator eval) {
+      LOG.log(Level.INFO, "TIME: Completed Evaluator {0}", eval.getId());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/Launch.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/Launch.java
new file mode 100644
index 0000000..af8f455
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/Launch.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.pool;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Pool of Evaluators example - main class.
+ */
+public final class Launch {
+
+  /**
+   * Number of REEF worker threads in local mode.
+   */
+  private static final int NUM_LOCAL_THREADS = 4;
+  /**
+   * Standard Java logger
+   */
+  private static final Logger LOG = Logger.getLogger(Launch.class.getName());
+
+  /**
+   * This class should not be instantiated.
+   */
+  private Launch() {
+    throw new RuntimeException("Do not instantiate this class!");
+  }
+
+  /**
+   * Parse the command line arguments.
+   *
+   * @param args command line arguments, as passed to main()
+   * @return Configuration object.
+   * @throws BindException configuration error.
+   * @throws IOException   error reading the configuration.
+   */
+  private static Configuration parseCommandLine(final String[] args)
+      throws BindException, IOException {
+    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+    final CommandLine cl = new CommandLine(confBuilder);
+    cl.registerShortNameOfClass(Local.class);
+    cl.registerShortNameOfClass(Piggyback.class);
+    cl.registerShortNameOfClass(NumEvaluators.class);
+    cl.registerShortNameOfClass(NumTasks.class);
+    cl.registerShortNameOfClass(Delay.class);
+    cl.registerShortNameOfClass(JobId.class);
+    cl.processCommandLine(args);
+    return confBuilder.build();
+  }
+
+  private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
+      throws InjectionException, BindException {
+    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+    cb.bindNamedParameter(Piggyback.class, String.valueOf(injector.getNamedInstance(Piggyback.class)));
+    cb.bindNamedParameter(NumEvaluators.class, String.valueOf(injector.getNamedInstance(NumEvaluators.class)));
+    cb.bindNamedParameter(NumTasks.class, String.valueOf(injector.getNamedInstance(NumTasks.class)));
+    cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class)));
+    return cb.build();
+  }
+
+  /**
+   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+   *
+   * @param commandLineConf Parsed command line arguments, as passed into main().
+   * @return (immutable) TANG Configuration object.
+   * @throws BindException      if configuration commandLineInjector fails.
+   * @throws InjectionException if configuration commandLineInjector fails.
+   */
+  private static Configuration getClientConfiguration(
+      final Configuration commandLineConf, final boolean isLocal)
+      throws BindException, InjectionException {
+    final Configuration runtimeConfiguration;
+    if (isLocal) {
+      LOG.log(Level.FINE, "Running on the local runtime");
+      runtimeConfiguration = LocalRuntimeConfiguration.CONF
+          .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+          .build();
+    } else {
+      LOG.log(Level.FINE, "Running on YARN");
+      runtimeConfiguration = YarnClientConfiguration.CONF.build();
+    }
+    return Tang.Factory.getTang().newConfigurationBuilder(
+        runtimeConfiguration, cloneCommandLineConfiguration(commandLineConf))
+        .build();
+  }
+
+  /**
+   * Main method that launches the REEF job.
+   *
+   * @param args command line parameters.
+   */
+  public static void main(final String[] args) {
+
+    try {
+
+      final Configuration commandLineConf = parseCommandLine(args);
+      final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+
+      final boolean isLocal = injector.getNamedInstance(Local.class);
+      final int numEvaluators = injector.getNamedInstance(NumEvaluators.class);
+      final int numTasks = injector.getNamedInstance(NumTasks.class);
+      final int delay = injector.getNamedInstance(Delay.class);
+      final int jobNum = injector.getNamedInstance(JobId.class);
+
+      final String jobId = String.format("pool.e_%d.a_%d.d_%d.%d",
+          numEvaluators, numTasks, delay, jobNum < 0 ? System.currentTimeMillis() : jobNum);
+
+      // Timeout: delay + 6 extra seconds per Task per Evaluator + 2 minutes to allocate each Evaluator:
+      final int timeout = numTasks * (delay + 6) * 1000 / numEvaluators + numEvaluators * 120000;
+
+      final Configuration runtimeConfig = getClientConfiguration(commandLineConf, isLocal);
+      LOG.log(Level.INFO, "TIME: Start Client {0} with timeout {1} sec. Configuration:\n--\n{2}--",
+          new Object[]{jobId, timeout / 1000, new AvroConfigurationSerializer().toString(runtimeConfig)});
+
+      final Configuration driverConfig = DriverConfiguration.CONF
+          .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class))
+          .set(DriverConfiguration.DRIVER_IDENTIFIER, jobId)
+          .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
+          .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class)
+          .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
+          .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
+          .set(DriverConfiguration.ON_TASK_RUNNING, JobDriver.RunningTaskHandler.class)
+          .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
+          .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class)
+          .build();
+
+      final Configuration submittedConfiguration = Tang.Factory.getTang()
+          .newConfigurationBuilder(driverConfig, commandLineConf).build();
+      DriverLauncher.getLauncher(runtimeConfig)
+          .run(submittedConfiguration, timeout);
+
+      LOG.log(Level.INFO, "TIME: Stop Client {0}", jobId);
+
+    } catch (final BindException | InjectionException | IOException ex) {
+      LOG.log(Level.SEVERE, "Job configuration error", ex);
+    }
+  }
+
+  /**
+   * Command line parameter: number of Evaluators to request.
+   */
+  @NamedParameter(doc = "Number of evaluators to request", short_name = "evaluators")
+  public static final class NumEvaluators implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter: number of Tasks to run.
+   */
+  @NamedParameter(doc = "Number of tasks to run", short_name = "tasks")
+  public static final class NumTasks implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter: number of experiments to run.
+   */
+  @NamedParameter(doc = "Number of seconds to sleep in each task", short_name = "delay")
+  public static final class Delay implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter = true to submit task and context in one request.
+   */
+  @NamedParameter(doc = "Submit task and context together",
+      short_name = "piggyback", default_value = "true")
+  public static final class Piggyback implements Name<Boolean> {
+  }
+
+  /**
+   * Command line parameter = true to run locally, or false to run on YARN.
+   */
+  @NamedParameter(doc = "Whether or not to run on the local runtime",
+      short_name = "local", default_value = "true")
+  public static final class Local implements Name<Boolean> {
+  }
+
+  /**
+   * Command line parameter = Numeric ID for the job.
+   */
+  @NamedParameter(doc = "Numeric ID for the job", short_name = "id", default_value = "-1")
+  public static final class JobId implements Name<Integer> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/SleepTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/SleepTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/SleepTask.java
new file mode 100644
index 0000000..1bd2059
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/SleepTask.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.pool;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Sleep for delay seconds and quit.
+ */
+public class SleepTask implements Task {
+
+  /**
+   * Standard java logger.
+   */
+  private static final Logger LOG = Logger.getLogger(SleepTask.class.getName());
+
+  /**
+   * Number of milliseconds to sleep.
+   */
+  private final int delay;
+
+  /**
+   * Task constructor. Parameters are injected automatically by TANG.
+   *
+   * @param delay number of seconds to sleep.
+   */
+  @Inject
+  private SleepTask(final @Parameter(Launch.Delay.class) Integer delay) {
+    this.delay = delay * 1000;
+  }
+
+  /**
+   * Sleep for delay milliseconds and return.
+   *
+   * @param memento ignored.
+   * @return null.
+   */
+  @Override
+  public byte[] call(final byte[] memento) {
+    LOG.log(Level.FINE, "Task started: sleep for: {0} msec.", this.delay);
+    final long ts = System.currentTimeMillis();
+    for (long period = this.delay; period > 0; period -= System.currentTimeMillis() - ts) {
+      try {
+        Thread.sleep(period);
+      } catch (final InterruptedException ex) {
+        LOG.log(Level.FINEST, "Interrupted: {0}", ex);
+      }
+    }
+    LOG.log(Level.FINE, "Task finished after {0} msec.", System.currentTimeMillis() - ts);
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/package-info.java
new file mode 100644
index 0000000..e67c862
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Allocate N evaluators, submit M tasks to them, and measure the time.
+ * Each task does nothing but sleeps for D seconds.
+ */
+package org.apache.reef.examples.pool;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java
new file mode 100644
index 0000000..61d549d
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.retained_eval;
+
+import org.apache.reef.client.*;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+
+import javax.inject.Inject;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Retained Evaluator Shell Client.
+ */
+@Unit
+public class JobClient {
+
+  /**
+   * Standard java logger.
+   */
+  private static final Logger LOG = Logger.getLogger(JobClient.class.getName());
+
+  /**
+   * Codec to translate messages to and from the job driver
+   */
+  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+
+  /**
+   * Reference to the REEF framework.
+   * This variable is injected automatically in the constructor.
+   */
+  private final REEF reef;
+
+  /**
+   * Shell command to submitTask to the job driver.
+   */
+  private final String command;
+
+  /**
+   * Job Driver configuration.
+   */
+  private final Configuration driverConfiguration;
+
+  /**
+   * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode.
+   */
+  private final boolean isInteractive;
+
+  /**
+   * Total number of experiments to run.
+   */
+  private final int maxRuns;
+
+  /**
+   * Command prompt reader for the interactive mode (stdin).
+   */
+  private final BufferedReader prompt;
+
+  /**
+   * A reference to the running job that allows client to send messages back to the job driver
+   */
+  private RunningJob runningJob;
+
+  /**
+   * Start timestamp of the current task.
+   */
+  private long startTime = 0;
+
+  /**
+   * Total time spent performing tasks in Evaluators.
+   */
+  private long totalTime = 0;
+
+  /**
+   * Number of experiments ran so far.
+   */
+  private int numRuns = 0;
+
+  /**
+   * Set to false when job driver is done.
+   */
+  private boolean isBusy = true;
+
+  /**
+   * Last result returned from the job driver.
+   */
+  private String lastResult;
+
+  /**
+   * Retained Evaluator client.
+   * Parameters are injected automatically by TANG.
+   *
+   * @param command Shell command to run on each Evaluator.
+   * @param reef    Reference to the REEF framework.
+   */
+  @Inject
+  JobClient(final REEF reef,
+            @Parameter(Command.class) final String command,
+            @Parameter(Launch.NumRuns.class) final Integer numRuns,
+            @Parameter(Launch.NumEval.class) final Integer numEvaluators) throws BindException {
+
+    this.reef = reef;
+    this.command = command;
+    this.maxRuns = numRuns;
+
+    // If command is not set, switch to interactive mode. (Yes, we compare pointers here)
+    this.isInteractive = this.command ==
+        Command.class.getAnnotation(NamedParameter.class).default_value();
+
+    this.prompt = this.isInteractive ?
+        new BufferedReader(new InputStreamReader(System.in)) : null;
+
+    final JavaConfigurationBuilder configBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+    configBuilder.addConfiguration(
+        DriverConfiguration.CONF
+            .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class))
+            .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis())
+            .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
+            .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
+            .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
+            .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
+            .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
+            .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
+            .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class)
+            .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
+            .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class)
+            .build()
+    );
+    configBuilder.bindNamedParameter(Launch.NumEval.class, "" + numEvaluators);
+    this.driverConfiguration = configBuilder.build();
+  }
+
+  /**
+   * @return a Configuration binding the ClientConfiguration.* event handlers to this Client.
+   */
+  public static Configuration getClientConfiguration() {
+    return ClientConfiguration.CONF
+        .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class)
+        .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class)
+        .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
+        .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
+        .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
+        .build();
+  }
+
+  /**
+   * Launch the job driver.
+   *
+   * @throws BindException configuration error.
+   */
+  public void submit() {
+    this.reef.submit(this.driverConfiguration);
+  }
+
+  /**
+   * Send command to the job driver. Record timestamp when the command was sent.
+   * If this.command is set, use it; otherwise, ask user for the command.
+   */
+  private synchronized void submitTask() {
+    if (this.isInteractive) {
+      String cmd;
+      try {
+        do {
+          System.out.print("\nRE> ");
+          cmd = this.prompt.readLine();
+        } while (cmd != null && cmd.trim().isEmpty());
+      } catch (final IOException ex) {
+        LOG.log(Level.FINE, "Error reading from stdin: {0}", ex);
+        cmd = null;
+      }
+      if (cmd == null || cmd.equals("exit")) {
+        this.runningJob.close();
+        stopAndNotify();
+      } else {
+        this.submitTask(cmd);
+      }
+    } else {
+      // non-interactive batch mode:
+      this.submitTask(this.command);
+    }
+  }
+
+  /**
+   * Send command to the job driver. Record timestamp when the command was sent.
+   *
+   * @param cmd shell command to execute in all Evaluators.
+   */
+  private synchronized void submitTask(final String cmd) {
+    LOG.log(Level.FINE, "Submit task {0} \"{1}\" to {2}",
+        new Object[]{this.numRuns + 1, cmd, this.runningJob});
+    this.startTime = System.currentTimeMillis();
+    this.runningJob.send(CODEC.encode(cmd));
+  }
+
+  /**
+   * Notify the process in waitForCompletion() method that the main process has finished.
+   */
+  private synchronized void stopAndNotify() {
+    this.runningJob = null;
+    this.isBusy = false;
+    this.notify();
+  }
+
+  /**
+   * Wait for the job driver to complete. This method is called from Launch.main()
+   */
+  public String waitForCompletion() {
+    while (this.isBusy) {
+      LOG.log(Level.FINE, "Waiting for the Job Driver to complete.");
+      try {
+        synchronized (this) {
+          this.wait();
+        }
+      } catch (final InterruptedException ex) {
+        LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
+      }
+    }
+    return this.lastResult;
+  }
+
+  public void close() {
+    this.reef.close();
+  }
+
+  /**
+   * Receive notification from the job driver that the job is running.
+   */
+  final class RunningJobHandler implements EventHandler<RunningJob> {
+    @Override
+    public void onNext(final RunningJob job) {
+      LOG.log(Level.FINE, "Running job: {0}", job.getId());
+      synchronized (JobClient.this) {
+        JobClient.this.runningJob = job;
+        JobClient.this.submitTask();
+      }
+    }
+  }
+
+  /**
+   * Receive message from the job driver.
+   * There is only one message, which comes at the end of the driver execution
+   * and contains shell command output on each node.
+   */
+  final class JobMessageHandler implements EventHandler<JobMessage> {
+    @Override
+    public void onNext(final JobMessage message) {
+      synchronized (JobClient.this) {
+
+        lastResult = CODEC.decode(message.get());
+        final long jobTime = System.currentTimeMillis() - startTime;
+        totalTime += jobTime;
+        ++numRuns;
+
+        LOG.log(Level.FINE, "TIME: Task {0} completed in {1} msec.:\n{2}",
+            new Object[]{"" + numRuns, "" + jobTime, lastResult});
+
+        System.out.println(lastResult);
+
+        if (runningJob != null) {
+          if (isInteractive || numRuns < maxRuns) {
+            submitTask();
+          } else {
+            LOG.log(Level.INFO,
+                "All {0} tasks complete; Average task time: {1}. Closing the job driver.",
+                new Object[]{maxRuns, totalTime / (double) maxRuns});
+            runningJob.close();
+            stopAndNotify();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification from the job driver that the job had failed.
+   */
+  final class FailedJobHandler implements EventHandler<FailedJob> {
+    @Override
+    public void onNext(final FailedJob job) {
+      LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
+      stopAndNotify();
+    }
+  }
+
+  /**
+   * Receive notification from the job driver that the job had completed successfully.
+   */
+  final class CompletedJobHandler implements EventHandler<CompletedJob> {
+    @Override
+    public void onNext(final CompletedJob job) {
+      LOG.log(Level.FINE, "Completed job: {0}", job.getId());
+      stopAndNotify();
+    }
+  }
+
+  /**
+   * Receive notification that there was an exception thrown from the job driver.
+   */
+  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
+    @Override
+    public void onNext(final FailedRuntime error) {
+      LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null));
+      stopAndNotify();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java
new file mode 100644
index 0000000..b2b2055
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.retained_eval;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Retained Evaluator example job driver. Execute shell command on all evaluators,
+ * capture stdout, and return concatenated results back to the client.
+ */
+@Unit
+public final class JobDriver {
+  /**
+   * Standard Java logger.
+   */
+  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
+
+  /**
+   * Duration of one clock interval.
+   */
+  private static final int CHECK_UP_INTERVAL = 1000; // 1 sec.
+
+  /**
+   * String codec is used to encode the results
+   * before passing them back to the client.
+   */
+  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+  /**
+   * Job observer on the client.
+   * We use it to send results from the driver back to the client.
+   */
+  private final JobMessageObserver jobMessageObserver;
+  /**
+   * Job driver uses EvaluatorRequestor
+   * to request Evaluators that will run the Tasks.
+   */
+  private final EvaluatorRequestor evaluatorRequestor;
+  /**
+   * Number of Evalutors to request (default is 1).
+   */
+  private final int numEvaluators;
+  /**
+   * Shell execution results from each Evaluator.
+   */
+  private final List<String> results = new ArrayList<>();
+  /**
+   * Map from context ID to running evaluator context.
+   */
+  private final Map<String, ActiveContext> contexts = new HashMap<>();
+  /**
+   * Job driver state.
+   */
+  private State state = State.INIT;
+  /**
+   * First command to execute. Sometimes client can send us the first command
+   * before Evaluators are available; we need to store this command here.
+   */
+  private String cmd;
+  /**
+   * Number of evaluators/tasks to complete.
+   */
+  private int expectCount = 0;
+
+  /**
+   * Job driver constructor.
+   * All parameters are injected from TANG automatically.
+   *
+   * @param jobMessageObserver is used to send messages back to the client.
+   * @param evaluatorRequestor is used to request Evaluators.
+   */
+  @Inject
+  JobDriver(final JobMessageObserver jobMessageObserver,
+            final EvaluatorRequestor evaluatorRequestor,
+            final @Parameter(Launch.NumEval.class) Integer numEvaluators) {
+    this.jobMessageObserver = jobMessageObserver;
+    this.evaluatorRequestor = evaluatorRequestor;
+    this.numEvaluators = numEvaluators;
+  }
+
+  /**
+   * Construct the final result and forward it to the Client.
+   */
+  private void returnResults() {
+    final StringBuilder sb = new StringBuilder();
+    for (final String result : this.results) {
+      sb.append(result);
+    }
+    this.results.clear();
+    LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
+    this.jobMessageObserver.sendMessageToClient(CODEC.encode(sb.toString()));
+  }
+
+  /**
+   * Submit command to all available evaluators.
+   *
+   * @param command shell command to execute.
+   */
+  private void submit(final String command) {
+    LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
+        new Object[]{command, this.contexts.size(), this.state});
+    assert (this.state == State.READY);
+    this.expectCount = this.contexts.size();
+    this.state = State.WAIT_TASKS;
+    this.cmd = null;
+    for (final ActiveContext context : this.contexts.values()) {
+      this.submit(context, command);
+    }
+  }
+
+  /**
+   * Submit a Task that execute the command to a single Evaluator.
+   * This method is called from <code>submitTask(cmd)</code>.
+   */
+  private void submit(final ActiveContext context, final String command) {
+    try {
+      LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context});
+      final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+      cb.addConfiguration(
+          TaskConfiguration.CONF
+              .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
+              .set(TaskConfiguration.TASK, ShellTask.class)
+              .build()
+      );
+      cb.bindNamedParameter(Command.class, command);
+      context.submitTask(cb.build());
+    } catch (final BindException ex) {
+      LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
+      context.close();
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * Request the evaluators.
+   */
+  private synchronized void requestEvaluators() {
+    assert (this.state == State.INIT);
+    LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators);
+    this.evaluatorRequestor.submit(
+        EvaluatorRequest.newBuilder()
+            .setMemory(128)
+            .setNumberOfCores(1)
+            .setNumber(this.numEvaluators).build()
+    );
+    this.state = State.WAIT_EVALUATORS;
+    this.expectCount = this.numEvaluators;
+  }
+
+  /**
+   * Possible states of the job driver. Can be one of:
+   * <dl>
+   * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd>
+   * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd>
+   * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd>
+   * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd>
+   * </dl>
+   */
+  private enum State {
+    INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
+  }
+
+  /**
+   * Receive notification that an Evaluator had been allocated,
+   * and submitTask a new Task in that Evaluator.
+   */
+  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator eval) {
+      synchronized (JobDriver.this) {
+        LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
+            new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()});
+        assert (JobDriver.this.state == State.WAIT_EVALUATORS);
+        try {
+          eval.submitContext(ContextConfiguration.CONF.set(
+              ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
+        } catch (final BindException ex) {
+          LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex);
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the entire Evaluator had failed.
+   * Stop other jobs and pass this error to the job observer on the client.
+   */
+  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+    @Override
+    public void onNext(final FailedEvaluator eval) {
+      synchronized (JobDriver.this) {
+        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+        for (final FailedContext failedContext : eval.getFailedContextList()) {
+          JobDriver.this.contexts.remove(failedContext.getId());
+        }
+        throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
+      }
+    }
+  }
+
+  /**
+   * Receive notification that a new Context is available.
+   * Submit a new Distributed Shell Task to that Context.
+   */
+  final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext context) {
+      synchronized (JobDriver.this) {
+        LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
+            new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state});
+        assert (JobDriver.this.state == State.WAIT_EVALUATORS);
+        JobDriver.this.contexts.put(context.getId(), context);
+        if (--JobDriver.this.expectCount <= 0) {
+          JobDriver.this.state = State.READY;
+          if (JobDriver.this.cmd == null) {
+            LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
+                JobDriver.this.state);
+          } else {
+            JobDriver.this.submit(JobDriver.this.cmd);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Context had completed.
+   * Remove context from the list of active context.
+   */
+  final class ClosedContextHandler implements EventHandler<ClosedContext> {
+    @Override
+    public void onNext(final ClosedContext context) {
+      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
+      synchronized (JobDriver.this) {
+        JobDriver.this.contexts.remove(context.getId());
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Context had failed.
+   * Remove context from the list of active context and notify the client.
+   */
+  final class FailedContextHandler implements EventHandler<FailedContext> {
+    @Override
+    public void onNext(final FailedContext context) {
+      LOG.log(Level.SEVERE, "FailedContext", context);
+      synchronized (JobDriver.this) {
+        JobDriver.this.contexts.remove(context.getId());
+      }
+      throw new RuntimeException("Failed context: ", context.asError());
+    }
+  }
+
+  /**
+   * Receive notification that the Task has completed successfully.
+   */
+  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask task) {
+      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
+      // Take the message returned by the task and add it to the running result.
+      final String result = CODEC.decode(task.get());
+      synchronized (JobDriver.this) {
+        JobDriver.this.results.add(task.getId() + " :: " + result);
+        LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
+            task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state});
+        if (--JobDriver.this.expectCount <= 0) {
+          JobDriver.this.returnResults();
+          JobDriver.this.state = State.READY;
+          if (JobDriver.this.cmd != null) {
+            JobDriver.this.submit(JobDriver.this.cmd);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification from the client.
+   */
+  final class ClientMessageHandler implements EventHandler<byte[]> {
+    @Override
+    public void onNext(final byte[] message) {
+      synchronized (JobDriver.this) {
+        final String command = CODEC.decode(message);
+        LOG.log(Level.INFO, "Client message: {0} state: {1}",
+            new Object[]{command, JobDriver.this.state});
+        assert (JobDriver.this.cmd == null);
+        if (JobDriver.this.state == State.READY) {
+          JobDriver.this.submit(command);
+        } else {
+          // not ready yet - save the command for better times.
+          assert (JobDriver.this.state == State.WAIT_EVALUATORS);
+          JobDriver.this.cmd = command;
+        }
+      }
+    }
+  }
+
+  /**
+   * Job Driver is ready and the clock is set up: request the evaluators.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
+      assert (state == State.INIT);
+      requestEvaluators();
+    }
+  }
+
+  /**
+   * Shutting down the job driver: close the evaluators.
+   */
+  final class StopHandler implements EventHandler<StopTime> {
+    @Override
+    public void onNext(final StopTime time) {
+      LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
+      for (final ActiveContext context : contexts.values()) {
+        context.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/Launch.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/Launch.java
new file mode 100644
index 0000000..cf230bc
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/Launch.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.retained_eval;
+
+import org.apache.reef.client.ClientConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.CommandLine;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Retained Evaluators example - main class.
+ */
+public final class Launch {
+
+  /**
+   * Number of REEF worker threads in local mode.
+   */
+  private static final int NUM_LOCAL_THREADS = 4;
+  /**
+   * Standard Java logger
+   */
+  private static final Logger LOG = Logger.getLogger(Launch.class.getName());
+
+  /**
+   * This class should not be instantiated.
+   */
+  private Launch() {
+    throw new RuntimeException("Do not instantiate this class!");
+  }
+
+  /**
+   * Parse the command line arguments.
+   *
+   * @param args command line arguments, as passed to main()
+   * @return Configuration object.
+   * @throws BindException configuration error.
+   * @throws IOException   error reading the configuration.
+   */
+  private static Configuration parseCommandLine(final String[] args)
+      throws BindException, IOException {
+    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+    final CommandLine cl = new CommandLine(confBuilder);
+    cl.registerShortNameOfClass(Local.class);
+    cl.registerShortNameOfClass(Command.class);
+    cl.registerShortNameOfClass(NumRuns.class);
+    cl.registerShortNameOfClass(NumEval.class);
+    cl.processCommandLine(args);
+    return confBuilder.build();
+  }
+
+  private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
+      throws InjectionException, BindException {
+    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+    cb.bindNamedParameter(Command.class, injector.getNamedInstance(Command.class));
+    cb.bindNamedParameter(NumRuns.class, String.valueOf(injector.getNamedInstance(NumRuns.class)));
+    cb.bindNamedParameter(NumEval.class, String.valueOf(injector.getNamedInstance(NumEval.class)));
+    return cb.build();
+  }
+
+  /**
+   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+   *
+   * @param args Command line arguments, as passed into main().
+   * @return (immutable) TANG Configuration object.
+   * @throws BindException      if configuration commandLineInjector fails.
+   * @throws InjectionException if configuration commandLineInjector fails.
+   * @throws IOException        error reading the configuration.
+   */
+  private static Configuration getClientConfiguration(final String[] args)
+      throws BindException, InjectionException, IOException {
+
+    final Configuration commandLineConf = parseCommandLine(args);
+
+    final Configuration clientConfiguration = ClientConfiguration.CONF
+        .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class)
+        .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class)
+        .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
+        .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
+        .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
+        .build();
+
+    // TODO: Remove the injector, have stuff injected.
+    final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
+    final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
+    final Configuration runtimeConfiguration;
+    if (isLocal) {
+      LOG.log(Level.INFO, "Running on the local runtime");
+      runtimeConfiguration = LocalRuntimeConfiguration.CONF
+          .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+          .build();
+    } else {
+      LOG.log(Level.INFO, "Running on YARN");
+      runtimeConfiguration = YarnClientConfiguration.CONF.build();
+    }
+
+    return Tang.Factory.getTang()
+        .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
+            cloneCommandLineConfiguration(commandLineConf))
+        .build();
+  }
+
+  /**
+   * Main method that starts the Retained Evaluators job.
+   *
+   * @return a string that contains last results from all evaluators.
+   */
+  public static String run(final Configuration config) throws InjectionException {
+    final Injector injector = Tang.Factory.getTang().newInjector(config);
+    final JobClient client = injector.getInstance(JobClient.class);
+    client.submit();
+    return client.waitForCompletion();
+  }
+
+  /**
+   * Main method that starts the Retained Evaluators job.
+   *
+   * @param args command line parameters.
+   */
+  public static void main(final String[] args) {
+    try {
+      final Configuration config = getClientConfiguration(args);
+      LOG.log(Level.FINEST, "Configuration:\n--\n{0}--",
+          new AvroConfigurationSerializer().toString(config));
+      run(config);
+      LOG.info("Done!");
+    } catch (final BindException | InjectionException | IOException ex) {
+      LOG.log(Level.SEVERE, "Job configuration error", ex);
+    }
+  }
+
+  /**
+   * Command line parameter: number of experiments to run.
+   */
+  @NamedParameter(doc = "Number of times to run the command",
+      short_name = "num_runs", default_value = "1")
+  public static final class NumRuns implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter: number of evaluators to allocate.
+   */
+  @NamedParameter(doc = "Number of evaluators to request",
+      short_name = "num_eval", default_value = "1")
+  public static final class NumEval implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter = true to run locally, or false to run on YARN.
+   */
+  @NamedParameter(doc = "Whether or not to run on the local runtime",
+      short_name = "local", default_value = "true")
+  public static final class Local implements Name<Boolean> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/package-info.java
new file mode 100644
index 0000000..842b659
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The Retained Evaluators example.
+ */
+package org.apache.reef.examples.retained_eval;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
new file mode 100644
index 0000000..960f297
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The body of Task scheduler. It owns a task queue
+ * and tracks the record of scheduled tasks.
+ */
+@ThreadSafe
+final class Scheduler {
+  /**
+   * Tasks are waiting to be scheduled in the queue.
+   */
+  private final Queue<TaskEntity> taskQueue;
+
+  /**
+   * Lists of {@link TaskEntity} for different states - Running / Finished / Canceled.
+   */
+  private final List<TaskEntity> runningTasks = new ArrayList<>();
+  private final List<TaskEntity> finishedTasks = new ArrayList<>();
+  private final List<TaskEntity> canceledTasks = new ArrayList<>();
+
+  /**
+   * Counts how many tasks have been scheduled.
+   */
+  private final AtomicInteger taskCount = new AtomicInteger(0);
+
+  @Inject
+  public Scheduler() {
+    taskQueue = new LinkedBlockingQueue<>();
+  }
+
+  /**
+   * Submit a task to the ActiveContext.
+   */
+  public synchronized void submitTask(final ActiveContext context) {
+    final TaskEntity task = taskQueue.poll();
+    final Integer taskId = task.getId();
+    final String command = task.getCommand();
+
+    final Configuration taskConf = TaskConfiguration.CONF
+      .set(TaskConfiguration.TASK, ShellTask.class)
+      .set(TaskConfiguration.IDENTIFIER, taskId.toString())
+      .build();
+    final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder()
+      .bindNamedParameter(Command.class, command)
+      .build();
+
+    final Configuration merged = Configurations.merge(taskConf, commandConf);
+    context.submitTask(merged);
+    runningTasks.add(task);
+  }
+
+  /**
+   * Update the record of task to mark it as canceled.
+   */
+  public synchronized SchedulerResponse cancelTask(final int taskId) {
+    if (getTask(taskId, runningTasks) != null) {
+      return SchedulerResponse.FORBIDDEN("The task " + taskId + " is running");
+    } else if (getTask(taskId, finishedTasks) != null) {
+      return SchedulerResponse.FORBIDDEN("The task " + taskId + " has been finished");
+    }
+
+    final TaskEntity task = getTask(taskId, taskQueue);
+    if (task == null) {
+      final String message = new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString();
+      return SchedulerResponse.NOT_FOUND(message);
+    } else {
+      taskQueue.remove(task);
+      canceledTasks.add(task);
+      return SchedulerResponse.OK("Canceled " + taskId);
+    }
+  }
+
+  /**
+   * Clear the pending list
+   */
+  public synchronized SchedulerResponse clear() {
+    final int count = taskQueue.size();
+    for (final TaskEntity task : taskQueue) {
+      canceledTasks.add(task);
+    }
+    taskQueue.clear();
+    return SchedulerResponse.OK(count + " tasks removed.");
+  }
+
+  /**
+   * Get the list of Tasks, which are grouped by the states.
+   */
+  public synchronized SchedulerResponse getList() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("Running :");
+    for (final TaskEntity running : runningTasks) {
+      sb.append(" ").append(running.getId());
+    }
+
+    sb.append("\nWaiting :");
+    for (final TaskEntity waiting : taskQueue) {
+      sb.append(" ").append(waiting.getId());
+    }
+
+    sb.append("\nFinished :");
+    for (final TaskEntity finished : finishedTasks) {
+      sb.append(" ").append(finished.getId());
+    }
+
+    sb.append("\nCanceled :");
+    for (final TaskEntity canceled : canceledTasks) {
+      sb.append(" ").append(canceled.getId());
+    }
+    return SchedulerResponse.OK(sb.toString());
+  }
+
+  /**
+   * Get the status of a Task.
+   */
+  public synchronized SchedulerResponse getTaskStatus(final int taskId) {
+
+    for (final TaskEntity running : runningTasks) {
+      if (taskId == running.getId()) {
+        return SchedulerResponse.OK("Running : " + running.toString());
+      }
+    }
+
+    for (final TaskEntity waiting : taskQueue) {
+      if (taskId == waiting.getId()) {
+        return SchedulerResponse.OK("Waiting : " + waiting.toString());
+      }
+    }
+
+    for (final TaskEntity finished : finishedTasks) {
+      if (taskId == finished.getId()) {
+        return SchedulerResponse.OK("Finished : " + finished.toString());
+      }
+    }
+
+    for (final TaskEntity finished : canceledTasks) {
+      if (taskId == finished.getId()) {
+        return SchedulerResponse.OK("Canceled: " + finished.toString());
+      }
+    }
+    return SchedulerResponse.NOT_FOUND(new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString());
+  }
+
+  /**
+   * Assigns a TaskId to submit.
+   */
+  public synchronized int assignTaskId() {
+    return taskCount.incrementAndGet();
+  }
+
+  /**
+   * Add a task to the queue.
+   */
+  public synchronized void addTask(TaskEntity task) {
+    taskQueue.add(task);
+  }
+
+  /**
+   * Check whether there are tasks waiting to be submitted.
+   */
+  public synchronized boolean hasPendingTasks() {
+    return !taskQueue.isEmpty();
+  }
+
+  /**
+   * Get the number of pending tasks in the queue.
+   */
+  public synchronized int getNumPendingTasks() {
+    return taskQueue.size();
+  }
+
+  /**
+   * Update the record of task to mark it as finished.
+   */
+  public synchronized void setFinished(final int taskId) {
+    final TaskEntity task = getTask(taskId, runningTasks);
+    runningTasks.remove(task);
+    finishedTasks.add(task);
+  }
+
+  /**
+   * Iterate over the collection to find a TaskEntity with ID.
+   */
+  private TaskEntity getTask(final int taskId, final Collection<TaskEntity> tasks) {
+    TaskEntity result = null;
+    for (final TaskEntity task : tasks) {
+      if (taskId == task.getId()) {
+        result = task;
+        break;
+      }
+    }
+    return result;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
new file mode 100644
index 0000000..890d872
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.inject.Inject;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver for TaskScheduler. It receives the commands by HttpRequest and
+ * execute them in a FIFO(First In First Out) order.
+ */
+@Unit
+public final class SchedulerDriver {
+
+  public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+  private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName());
+
+  /**
+   * Possible states of the job driver. Can be one of:
+   * <dl>
+   * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator.</dd>
+   * <du><code>WAIT_EVALUATORS</code></du><dd>Waiting for an evaluator allocated with no active evaluators.</dd>
+   * <du><code>READY</code></du><dd>Wait for the commands. Reactivated when a new Task arrives.</dd>
+   * <du><code>RUNNING</code></du><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd>
+   * </dl>
+   */
+  private enum State {
+    INIT, WAIT_EVALUATORS, READY, RUNNING
+  }
+
+  /**
+   * If true, it reuses evaluators when Tasks done.
+   */
+  private boolean retainable;
+
+  @GuardedBy("SchedulerDriver.this")
+  private State state = State.INIT;
+
+  @GuardedBy("SchedulerDriver.this")
+  private Scheduler scheduler;
+
+  @GuardedBy("SchedulerDriver.this")
+  private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0;
+
+  private final EvaluatorRequestor requestor;
+
+  @Inject
+  public SchedulerDriver(final EvaluatorRequestor requestor,
+                         @Parameter(SchedulerREEF.Retain.class) boolean retainable,
+                         final Scheduler scheduler) {
+    this.requestor = requestor;
+    this.scheduler = scheduler;
+    this.retainable = retainable;
+  }
+
+  /**
+   * The driver is ready to run.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      LOG.log(Level.INFO, "Driver started at {0}", startTime);
+      assert (state == State.INIT);
+      state = State.WAIT_EVALUATORS;
+
+      requestEvaluator(1); // Allocate an initial evaluator to avoid idle state.
+    }
+  }
+
+  /**
+   * Evaluator is allocated. This occurs every time to run commands in Non-retainable version,
+   * while occurs only once in the Retainable version
+   */
+  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator evaluator) {
+      LOG.log(Level.INFO, "Evaluator is ready");
+      synchronized (SchedulerDriver.this) {
+        nActiveEval++;
+        nRequestedEval--;
+      }
+
+      evaluator.submitContext(ContextConfiguration.CONF
+        .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
+        .build());
+    }
+  }
+
+  /**
+   * Now it is ready to schedule tasks. But if the queue is empty,
+   * wait until commands coming up.
+   *
+   * If there is no pending task, having more than 1 evaluators must be redundant.
+   * It may happen, for example, when tasks are canceled during allocation.
+   * In these cases, the new evaluator may be abandoned.
+   */
+  final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(ActiveContext context) {
+      synchronized (SchedulerDriver.this) {
+        LOG.log(Level.INFO, "Context available : {0}", context.getId());
+
+        if (scheduler.hasPendingTasks()) {
+          state = State.RUNNING;
+          scheduler.submitTask(context);
+        } else if (nActiveEval > 1) {
+          nActiveEval--;
+          context.close();
+        } else {
+          state = State.READY;
+          waitForCommands(context);
+        }
+      }
+    }
+  }
+
+  /**
+   * Non-retainable version of CompletedTaskHandler.
+   * When Task completes, it closes the active context to deallocate the evaluator
+   * and if there is outstanding commands, allocate another evaluator.
+   */
+  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask task) {
+      final int taskId = Integer.valueOf(task.getId());
+
+      synchronized (SchedulerDriver.this) {
+        scheduler.setFinished(taskId);
+
+        LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable));
+        final ActiveContext context = task.getActiveContext();
+
+        if (retainable) {
+          retainEvaluator(context);
+        } else {
+          reallocateEvaluator(context);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get the list of tasks in the scheduler.
+   */
+  public synchronized SchedulerResponse getList() {
+    return scheduler.getList();
+  }
+
+  /**
+   * Clear all the Tasks from the waiting queue.
+   */
+  public synchronized SchedulerResponse clearList() {
+    return scheduler.clear();
+  }
+
+  /**
+   * Get the status of a task.
+   */
+  public SchedulerResponse getTaskStatus(List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
+    }
+
+    final Integer taskId = Integer.valueOf(args.get(0));
+
+    synchronized (SchedulerDriver.this) {
+      return scheduler.getTaskStatus(taskId);
+    }
+  }
+
+  /**
+   * Cancel a Task waiting on the queue. A task cannot be canceled
+   * once it is running.
+   */
+  public SchedulerResponse cancelTask(final List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
+    }
+
+    final Integer taskId = Integer.valueOf(args.get(0));
+
+    synchronized (SchedulerDriver.this) {
+      return scheduler.cancelTask(taskId);
+    }
+  }
+
+  /**
+   * Submit a command to schedule.
+   */
+  public SchedulerResponse submitCommands(final List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.BAD_REQUEST("Usage : only one command at a time");
+    }
+
+    final String command = args.get(0);
+    final Integer id;
+
+    synchronized (SchedulerDriver.this) {
+      id = scheduler.assignTaskId();
+      scheduler.addTask(new TaskEntity(id, command));
+
+      if (state == State.READY) {
+        SchedulerDriver.this.notify(); // Wake up at {waitForCommands}
+      } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) {
+        requestEvaluator(1);
+      }
+    }
+    return SchedulerResponse.OK("Task ID : " + id);
+  }
+
+  /**
+   * Update the maximum number of evaluators to hold.
+   * Request more evaluators in case there are pending tasks
+   * in the queue and the number of evaluators is less than the limit.
+   */
+  public SchedulerResponse setMaxEvaluators(final List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.BAD_REQUEST("Usage : Only one value can be used");
+    }
+
+    final int nTarget = Integer.valueOf(args.get(0));
+
+    synchronized (SchedulerDriver.this) {
+      if (nTarget < nActiveEval + nRequestedEval) {
+        return SchedulerResponse.FORBIDDEN(nActiveEval + nRequestedEval +
+          " evaluators are used now. Should be larger than that.");
+      }
+      nMaxEval = nTarget;
+
+      if (scheduler.hasPendingTasks()) {
+        final int nToRequest =
+          Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval;
+        requestEvaluator(nToRequest);
+      }
+      return SchedulerResponse.OK("You can use evaluators up to " + nMaxEval + " evaluators.");
+    }
+  }
+
+  /**
+   * Request evaluators. Passing a non positive number is illegal,
+   * so it does not make a trial for that situation.
+   */
+  private void requestEvaluator(final int numToRequest) {
+    if (numToRequest <= 0) {
+      throw new IllegalArgumentException("The number of evaluator request should be a positive integer");
+    }
+
+    synchronized (SchedulerDriver.this) {
+      nRequestedEval += numToRequest;
+      requestor.submit(EvaluatorRequest.newBuilder()
+        .setMemory(32)
+        .setNumber(numToRequest)
+        .build());
+    }
+  }
+
+  /**
+   * Pick up a command from the queue and run it. Wait until
+   * any command coming up if no command exists.
+   */
+  private void waitForCommands(final ActiveContext context) {
+    synchronized (SchedulerDriver.this) {
+      while (!scheduler.hasPendingTasks()) {
+        // Wait until any command enters in the queue
+        try {
+          SchedulerDriver.this.wait();
+        } catch (InterruptedException e) {
+          LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e);
+        }
+      }
+      // When wakes up, run the first command from the queue.
+      state = State.RUNNING;
+      scheduler.submitTask(context);
+    }
+  }
+
+  /**
+   * Retain the complete evaluators submitting another task
+   * until there is no need to reuse them.
+   */
+  private synchronized void retainEvaluator(final ActiveContext context) {
+    if (scheduler.hasPendingTasks()) {
+      scheduler.submitTask(context);
+    } else if (nActiveEval > 1) {
+      nActiveEval--;
+      context.close();
+    } else {
+      state = State.READY;
+      waitForCommands(context);
+    }
+  }
+
+  /**
+   * Always close the complete evaluators and
+   * allocate a new evaluator if necessary.
+   */
+  private synchronized void reallocateEvaluator(final ActiveContext context) {
+    nActiveEval--;
+    context.close();
+
+    if (scheduler.hasPendingTasks()) {
+      requestEvaluator(1);
+    } else if (nActiveEval <= 0) {
+      state = State.WAIT_EVALUATORS;
+      requestEvaluator(1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
new file mode 100644
index 0000000..c2f6090
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Receive HttpRequest so that it can handle the command list
+ */
+final class SchedulerHttpHandler implements HttpHandler {
+  final InjectionFuture<SchedulerDriver> schedulerDriver;
+
+  private String uriSpecification = "reef-example-scheduler";
+
+  @Inject
+  public SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) {
+    this.schedulerDriver = schedulerDriver;
+  }
+
+  @Override
+  public String getUriSpecification() {
+    return uriSpecification;
+  }
+
+  @Override
+  public void setUriSpecification(String s) {
+    uriSpecification = s;
+  }
+
+  /**
+   * HttpRequest handler. You must specify UriSpecification and REST API version.
+   * The request url is http://{address}:{port}/reef-example-scheduler/v1
+   *
+   * APIs
+   *   /list                to get the status list for all tasks
+   *   /status?id={id}      to query the status of such a task, given id
+   *   /submit?cmd={cmd}    to submit a Task, which returns its id
+   *   /cancel?id={id}      to cancel the task's execution
+   *   /num-eval?num={num}  to set the maximum number of evaluators
+   *   /clear               to clear the waiting queue
+   */
+  @Override
+  public void onHttpRequest(ParsedHttpRequest request, HttpServletResponse response)
+    throws IOException, ServletException {
+    final String target = request.getTargetEntity().toLowerCase();
+    final Map<String, List<String>> queryMap = request.getQueryMap();
+
+    final SchedulerResponse result;
+    switch (target) {
+      case "list":
+        result = schedulerDriver.get().getList();
+        break;
+      case "clear":
+        result = schedulerDriver.get().clearList();
+        break;
+      case "status":
+        result = schedulerDriver.get().getTaskStatus(queryMap.get("id"));
+        break;
+      case "submit":
+        result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
+        break;
+      case "cancel":
+        result = schedulerDriver.get().cancelTask(queryMap.get("id"));
+        break;
+      case "max-eval":
+        result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num"));
+        break;
+      default:
+        result = SchedulerResponse.NOT_FOUND("Unsupported operation");
+    }
+
+    // Send response to the http client
+    final int status = result.getStatus();
+    final String message= result.getMessage();
+
+    if (result.isOK()) {
+      response.getOutputStream().println(message);
+    } else {
+      response.sendError(status, message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
new file mode 100644
index 0000000..2911905
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
+
+import java.io.IOException;
+
+/**
+ * REEF TaskScheduler.
+ */
+public final class SchedulerREEF {
+
+  /**
+   * Command line parameter = true to reuse evaluators,
+   * or false to allocate/close for each iteration
+   */
+  @NamedParameter(doc = "Whether or not to reuse evaluators",
+    short_name = "retain", default_value = "true")
+  public static final class Retain implements Name<Boolean> {
+  }
+
+  /**
+   * @return The http configuration to use reef-webserver
+   */
+  private final static Configuration getHttpConf() {
+    final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF
+      .set(HttpHandlerConfiguration.HTTP_HANDLERS, SchedulerHttpHandler.class)
+      .build();
+    return httpHandlerConf;
+  }
+
+  /**
+   * @return The Driver configuration.
+   */
+  private final static Configuration getDriverConf() {
+    final Configuration driverConf = DriverConfiguration.CONF
+      .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SchedulerDriver.class))
+      .set(DriverConfiguration.DRIVER_IDENTIFIER, "TaskScheduler")
+      .set(DriverConfiguration.ON_DRIVER_STARTED, SchedulerDriver.StartHandler.class)
+      .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SchedulerDriver.EvaluatorAllocatedHandler.class)
+      .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SchedulerDriver.ActiveContextHandler.class)
+      .set(DriverConfiguration.ON_TASK_COMPLETED, SchedulerDriver.CompletedTaskHandler.class)
+      .build();
+
+    return driverConf;
+  }
+
+  /**
+   * Run the Task scheduler. If '-retain true' option is passed via command line,
+   * the scheduler reuses evaluators to submit new Tasks.
+   * @param runtimeConf The runtime configuration (e.g. Local, YARN, etc)
+   * @param args Command line arguments.
+   * @throws InjectionException
+   * @throws java.io.IOException
+   */
+  public static void runTaskScheduler(final Configuration runtimeConf, final String[] args)
+    throws InjectionException, IOException, ParseException {
+    final Tang tang = Tang.Factory.getTang();
+
+    final Configuration commandLineConf = CommandLine.parseToConfiguration(args, Retain.class);
+
+    // Merge the configurations to run Driver
+    final Configuration driverConf = Configurations.merge(getDriverConf(), getHttpConf(), commandLineConf);
+
+    final REEF reef = tang.newInjector(runtimeConf).getInstance(REEF.class);
+    reef.submit(driverConf);
+  }
+
+  /**
+   * Main program
+   * @param args
+   * @throws InjectionException
+   */
+  public final static void main(String[] args) throws InjectionException, IOException, ParseException {
+    final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+      .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 3)
+      .build();
+    runTaskScheduler(runtimeConfiguration, args);
+  }
+}