You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:03 UTC
[30/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/JobDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/JobDriver.java
new file mode 100644
index 0000000..653b240
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/JobDriver.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.pool;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Allocate N evaluators, submit M tasks to them, and measure the time.
+ * Each task does nothing but sleeps for D seconds.
+ */
+@Unit
+public final class JobDriver {
+
+ /**
+ * Standard Java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
+
+ /**
+ * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
+ */
+ private final EvaluatorRequestor evaluatorRequestor;
+
+ /**
+ * If true, submit context and task in one request.
+ */
+ private final boolean isPiggyback;
+
+ /**
+ * Number of Evaluators to request.
+ */
+ private final int numEvaluators;
+
+ /**
+ * Number of Tasks to run.
+ */
+ private final int numTasks;
+ /**
+ * Number of seconds to sleep in each Task.
+ * (has to be a String to pass it into Task config).
+ */
+ private final String delayStr;
+ /**
+ * Number of Evaluators started.
+ */
+ private int numEvaluatorsStarted = 0;
+ /**
+ * Number of Tasks launched.
+ */
+ private int numTasksStarted = 0;
+
+ /**
+ * Job driver constructor.
+ * All parameters are injected from TANG automatically.
+ *
+ * @param evaluatorRequestor is used to request Evaluators.
+ */
+ @Inject
+ JobDriver(final EvaluatorRequestor evaluatorRequestor,
+ final @Parameter(Launch.Piggyback.class) Boolean isPiggyback,
+ final @Parameter(Launch.NumEvaluators.class) Integer numEvaluators,
+ final @Parameter(Launch.NumTasks.class) Integer numTasks,
+ final @Parameter(Launch.Delay.class) Integer delay) {
+ this.evaluatorRequestor = evaluatorRequestor;
+ this.isPiggyback = isPiggyback;
+ this.numEvaluators = numEvaluators;
+ this.numTasks = numTasks;
+ this.delayStr = "" + delay;
+ }
+
+ /**
+ * Build a new Task configuration for a given task ID.
+ *
+ * @param taskId Unique string ID of the task
+ * @return Immutable task configuration object, ready to be submitted to REEF.
+ * @throws RuntimeException that wraps BindException if unable to build the configuration.
+ */
+ private Configuration getTaskConfiguration(final String taskId) {
+ try {
+ return TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, taskId)
+ .set(TaskConfiguration.TASK, SleepTask.class)
+ .build();
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Failed to create Task Configuration: " + taskId, ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Job Driver is ready and the clock is set up: request the evaluators.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ LOG.log(Level.INFO, "TIME: Start Driver with {0} Evaluators", numEvaluators);
+ evaluatorRequestor.submit(
+ EvaluatorRequest.newBuilder()
+ .setMemory(128)
+ .setNumberOfCores(1)
+ .setNumber(numEvaluators).build()
+ );
+ }
+ }
+
+ /**
+ * Job Driver is is shutting down: write to the log.
+ */
+ final class StopHandler implements EventHandler<StopTime> {
+ @Override
+ public void onNext(final StopTime stopTime) {
+ LOG.log(Level.INFO, "TIME: Stop Driver");
+ }
+ }
+
+ /**
+ * Receive notification that an Evaluator had been allocated,
+ * and submitTask a new Task in that Evaluator.
+ */
+ final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator eval) {
+
+ LOG.log(Level.INFO, "TIME: Allocated Evaluator {0}", eval.getId());
+
+ final boolean runTask;
+ final int nEval;
+ final int nTask;
+
+ synchronized (JobDriver.this) {
+ runTask = numTasksStarted < numTasks;
+ if (runTask) {
+ ++numEvaluatorsStarted;
+ if (isPiggyback) {
+ ++numTasksStarted;
+ }
+ }
+ nEval = numEvaluatorsStarted;
+ nTask = numTasksStarted;
+ }
+
+ if (runTask) {
+
+ final String contextId = String.format("Context_%06d", nEval);
+ LOG.log(Level.INFO, "TIME: Submit Context {0} to Evaluator {1}",
+ new Object[]{contextId, eval.getId()});
+
+ try {
+
+ final JavaConfigurationBuilder contextConfigBuilder =
+ Tang.Factory.getTang().newConfigurationBuilder();
+
+ contextConfigBuilder.addConfiguration(ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, contextId)
+ .build());
+
+ contextConfigBuilder.bindNamedParameter(Launch.Delay.class, delayStr);
+
+ if (isPiggyback) {
+
+ final String taskId = String.format("StartTask_%08d", nTask);
+ final Configuration taskConfig = getTaskConfiguration(taskId);
+
+ LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
+ new Object[]{taskId, eval.getId()});
+
+ eval.submitContextAndTask(contextConfigBuilder.build(), taskConfig);
+
+ } else {
+ eval.submitContext(contextConfigBuilder.build());
+ }
+
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Failed to submit Context to Evaluator: " + eval.getId(), ex);
+ throw new RuntimeException(ex);
+ }
+ } else {
+ LOG.log(Level.INFO, "TIME: Close Evaluator {0}", eval.getId());
+ eval.close();
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Context is active.
+ */
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext context) {
+
+ LOG.log(Level.INFO, "TIME: Active Context {0}", context.getId());
+
+ if (isPiggyback) return; // Task already submitted
+
+ final boolean runTask;
+ final int nTask;
+
+ synchronized (JobDriver.this) {
+ runTask = numTasksStarted < numTasks;
+ if (runTask) {
+ ++numTasksStarted;
+ }
+ nTask = numTasksStarted;
+ }
+
+ if (runTask) {
+ final String taskId = String.format("StartTask_%08d", nTask);
+ LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
+ new Object[]{taskId, context.getEvaluatorId()});
+ context.submitTask(getTaskConfiguration(taskId));
+ } else {
+ context.close();
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Task is running.
+ */
+ final class RunningTaskHandler implements EventHandler<RunningTask> {
+ @Override
+ public void onNext(final RunningTask task) {
+ LOG.log(Level.INFO, "TIME: Running Task {0}", task.getId());
+ }
+ }
+
+ /**
+ * Receive notification that the Task has completed successfully.
+ */
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask task) {
+
+ final ActiveContext context = task.getActiveContext();
+ LOG.log(Level.INFO, "TIME: Completed Task {0} on Evaluator {1}",
+ new Object[]{task.getId(), context.getEvaluatorId()});
+
+ final boolean runTask;
+ final int nTask;
+ synchronized (JobDriver.this) {
+ runTask = numTasksStarted < numTasks;
+ if (runTask) {
+ ++numTasksStarted;
+ }
+ nTask = numTasksStarted;
+ }
+
+ if (runTask) {
+ final String taskId = String.format("Task_%08d", nTask);
+ LOG.log(Level.INFO, "TIME: Submit Task {0} to Evaluator {1}",
+ new Object[]{taskId, context.getEvaluatorId()});
+ context.submitTask(getTaskConfiguration(taskId));
+ } else {
+ LOG.log(Level.INFO, "TIME: Close Evaluator {0}", context.getEvaluatorId());
+ context.close();
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Evaluator has been shut down.
+ */
+ final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
+ @Override
+ public void onNext(final CompletedEvaluator eval) {
+ LOG.log(Level.INFO, "TIME: Completed Evaluator {0}", eval.getId());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/Launch.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/Launch.java
new file mode 100644
index 0000000..af8f455
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/Launch.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.pool;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Pool of Evaluators example - main class.
+ */
+public final class Launch {
+
+ /**
+ * Number of REEF worker threads in local mode.
+ */
+ private static final int NUM_LOCAL_THREADS = 4;
+ /**
+ * Standard Java logger
+ */
+ private static final Logger LOG = Logger.getLogger(Launch.class.getName());
+
+ /**
+ * This class should not be instantiated.
+ */
+ private Launch() {
+ throw new RuntimeException("Do not instantiate this class!");
+ }
+
+ /**
+ * Parse the command line arguments.
+ *
+ * @param args command line arguments, as passed to main()
+ * @return Configuration object.
+ * @throws BindException configuration error.
+ * @throws IOException error reading the configuration.
+ */
+ private static Configuration parseCommandLine(final String[] args)
+ throws BindException, IOException {
+ final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+ final CommandLine cl = new CommandLine(confBuilder);
+ cl.registerShortNameOfClass(Local.class);
+ cl.registerShortNameOfClass(Piggyback.class);
+ cl.registerShortNameOfClass(NumEvaluators.class);
+ cl.registerShortNameOfClass(NumTasks.class);
+ cl.registerShortNameOfClass(Delay.class);
+ cl.registerShortNameOfClass(JobId.class);
+ cl.processCommandLine(args);
+ return confBuilder.build();
+ }
+
+ private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
+ throws InjectionException, BindException {
+ final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ cb.bindNamedParameter(Piggyback.class, String.valueOf(injector.getNamedInstance(Piggyback.class)));
+ cb.bindNamedParameter(NumEvaluators.class, String.valueOf(injector.getNamedInstance(NumEvaluators.class)));
+ cb.bindNamedParameter(NumTasks.class, String.valueOf(injector.getNamedInstance(NumTasks.class)));
+ cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class)));
+ return cb.build();
+ }
+
+ /**
+ * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+ *
+ * @param commandLineConf Parsed command line arguments, as passed into main().
+ * @return (immutable) TANG Configuration object.
+ * @throws BindException if configuration commandLineInjector fails.
+ * @throws InjectionException if configuration commandLineInjector fails.
+ */
+ private static Configuration getClientConfiguration(
+ final Configuration commandLineConf, final boolean isLocal)
+ throws BindException, InjectionException {
+ final Configuration runtimeConfiguration;
+ if (isLocal) {
+ LOG.log(Level.FINE, "Running on the local runtime");
+ runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+ .build();
+ } else {
+ LOG.log(Level.FINE, "Running on YARN");
+ runtimeConfiguration = YarnClientConfiguration.CONF.build();
+ }
+ return Tang.Factory.getTang().newConfigurationBuilder(
+ runtimeConfiguration, cloneCommandLineConfiguration(commandLineConf))
+ .build();
+ }
+
+ /**
+ * Main method that launches the REEF job.
+ *
+ * @param args command line parameters.
+ */
+ public static void main(final String[] args) {
+
+ try {
+
+ final Configuration commandLineConf = parseCommandLine(args);
+ final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+
+ final boolean isLocal = injector.getNamedInstance(Local.class);
+ final int numEvaluators = injector.getNamedInstance(NumEvaluators.class);
+ final int numTasks = injector.getNamedInstance(NumTasks.class);
+ final int delay = injector.getNamedInstance(Delay.class);
+ final int jobNum = injector.getNamedInstance(JobId.class);
+
+ final String jobId = String.format("pool.e_%d.a_%d.d_%d.%d",
+ numEvaluators, numTasks, delay, jobNum < 0 ? System.currentTimeMillis() : jobNum);
+
+ // Timeout: delay + 6 extra seconds per Task per Evaluator + 2 minutes to allocate each Evaluator:
+ final int timeout = numTasks * (delay + 6) * 1000 / numEvaluators + numEvaluators * 120000;
+
+ final Configuration runtimeConfig = getClientConfiguration(commandLineConf, isLocal);
+ LOG.log(Level.INFO, "TIME: Start Client {0} with timeout {1} sec. Configuration:\n--\n{2}--",
+ new Object[]{jobId, timeout / 1000, new AvroConfigurationSerializer().toString(runtimeConfig)});
+
+ final Configuration driverConfig = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, jobId)
+ .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
+ .set(DriverConfiguration.ON_TASK_RUNNING, JobDriver.RunningTaskHandler.class)
+ .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class)
+ .build();
+
+ final Configuration submittedConfiguration = Tang.Factory.getTang()
+ .newConfigurationBuilder(driverConfig, commandLineConf).build();
+ DriverLauncher.getLauncher(runtimeConfig)
+ .run(submittedConfiguration, timeout);
+
+ LOG.log(Level.INFO, "TIME: Stop Client {0}", jobId);
+
+ } catch (final BindException | InjectionException | IOException ex) {
+ LOG.log(Level.SEVERE, "Job configuration error", ex);
+ }
+ }
+
+ /**
+ * Command line parameter: number of Evaluators to request.
+ */
+ @NamedParameter(doc = "Number of evaluators to request", short_name = "evaluators")
+ public static final class NumEvaluators implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter: number of Tasks to run.
+ */
+ @NamedParameter(doc = "Number of tasks to run", short_name = "tasks")
+ public static final class NumTasks implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter: number of experiments to run.
+ */
+ @NamedParameter(doc = "Number of seconds to sleep in each task", short_name = "delay")
+ public static final class Delay implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter = true to submit task and context in one request.
+ */
+ @NamedParameter(doc = "Submit task and context together",
+ short_name = "piggyback", default_value = "true")
+ public static final class Piggyback implements Name<Boolean> {
+ }
+
+ /**
+ * Command line parameter = true to run locally, or false to run on YARN.
+ */
+ @NamedParameter(doc = "Whether or not to run on the local runtime",
+ short_name = "local", default_value = "true")
+ public static final class Local implements Name<Boolean> {
+ }
+
+ /**
+ * Command line parameter = Numeric ID for the job.
+ */
+ @NamedParameter(doc = "Numeric ID for the job", short_name = "id", default_value = "-1")
+ public static final class JobId implements Name<Integer> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/SleepTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/SleepTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/SleepTask.java
new file mode 100644
index 0000000..1bd2059
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/SleepTask.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.pool;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Sleep for delay seconds and quit.
+ */
+public class SleepTask implements Task {
+
+ /**
+ * Standard java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(SleepTask.class.getName());
+
+ /**
+ * Number of milliseconds to sleep.
+ */
+ private final int delay;
+
+ /**
+ * Task constructor. Parameters are injected automatically by TANG.
+ *
+ * @param delay number of seconds to sleep.
+ */
+ @Inject
+ private SleepTask(final @Parameter(Launch.Delay.class) Integer delay) {
+ this.delay = delay * 1000;
+ }
+
+ /**
+ * Sleep for delay milliseconds and return.
+ *
+ * @param memento ignored.
+ * @return null.
+ */
+ @Override
+ public byte[] call(final byte[] memento) {
+ LOG.log(Level.FINE, "Task started: sleep for: {0} msec.", this.delay);
+ final long ts = System.currentTimeMillis();
+ for (long period = this.delay; period > 0; period -= System.currentTimeMillis() - ts) {
+ try {
+ Thread.sleep(period);
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.FINEST, "Interrupted: {0}", ex);
+ }
+ }
+ LOG.log(Level.FINE, "Task finished after {0} msec.", System.currentTimeMillis() - ts);
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/package-info.java
new file mode 100644
index 0000000..e67c862
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/pool/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Allocate N evaluators, submit M tasks to them, and measure the time.
+ * Each task does nothing but sleeps for D seconds.
+ */
+package org.apache.reef.examples.pool;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java
new file mode 100644
index 0000000..61d549d
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobClient.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.retained_eval;
+
+import org.apache.reef.client.*;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+
+import javax.inject.Inject;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Retained Evaluator Shell Client.
+ */
+@Unit
+public class JobClient {
+
+ /**
+ * Standard java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(JobClient.class.getName());
+
+ /**
+ * Codec to translate messages to and from the job driver
+ */
+ private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+
+ /**
+ * Reference to the REEF framework.
+ * This variable is injected automatically in the constructor.
+ */
+ private final REEF reef;
+
+ /**
+ * Shell command to submitTask to the job driver.
+ */
+ private final String command;
+
+ /**
+ * Job Driver configuration.
+ */
+ private final Configuration driverConfiguration;
+
+ /**
+ * If true, take commands from stdin; otherwise, use -cmd parameter in batch mode.
+ */
+ private final boolean isInteractive;
+
+ /**
+ * Total number of experiments to run.
+ */
+ private final int maxRuns;
+
+ /**
+ * Command prompt reader for the interactive mode (stdin).
+ */
+ private final BufferedReader prompt;
+
+ /**
+ * A reference to the running job that allows client to send messages back to the job driver
+ */
+ private RunningJob runningJob;
+
+ /**
+ * Start timestamp of the current task.
+ */
+ private long startTime = 0;
+
+ /**
+ * Total time spent performing tasks in Evaluators.
+ */
+ private long totalTime = 0;
+
+ /**
+ * Number of experiments ran so far.
+ */
+ private int numRuns = 0;
+
+ /**
+ * Set to false when job driver is done.
+ */
+ private boolean isBusy = true;
+
+ /**
+ * Last result returned from the job driver.
+ */
+ private String lastResult;
+
+ /**
+ * Retained Evaluator client.
+ * Parameters are injected automatically by TANG.
+ *
+ * @param command Shell command to run on each Evaluator.
+ * @param reef Reference to the REEF framework.
+ */
+ @Inject
+ JobClient(final REEF reef,
+ @Parameter(Command.class) final String command,
+ @Parameter(Launch.NumRuns.class) final Integer numRuns,
+ @Parameter(Launch.NumEval.class) final Integer numEvaluators) throws BindException {
+
+ this.reef = reef;
+ this.command = command;
+ this.maxRuns = numRuns;
+
+ // If command is not set, switch to interactive mode. (Yes, we compare pointers here)
+ this.isInteractive = this.command ==
+ Command.class.getAnnotation(NamedParameter.class).default_value();
+
+ this.prompt = this.isInteractive ?
+ new BufferedReader(new InputStreamReader(System.in)) : null;
+
+ final JavaConfigurationBuilder configBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+ configBuilder.addConfiguration(
+ DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(JobDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "eval-" + System.currentTimeMillis())
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
+ .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
+ .set(DriverConfiguration.ON_CLIENT_MESSAGE, JobDriver.ClientMessageHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STOP, JobDriver.StopHandler.class)
+ .build()
+ );
+ configBuilder.bindNamedParameter(Launch.NumEval.class, "" + numEvaluators);
+ this.driverConfiguration = configBuilder.build();
+ }
+
+ /**
+ * @return a Configuration binding the ClientConfiguration.* event handlers to this Client.
+ */
+ public static Configuration getClientConfiguration() {
+ return ClientConfiguration.CONF
+ .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class)
+ .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
+ .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
+ .build();
+ }
+
+ /**
+ * Launch the job driver.
+ *
+ * @throws BindException configuration error.
+ */
+ public void submit() {
+ this.reef.submit(this.driverConfiguration);
+ }
+
+ /**
+ * Send command to the job driver. Record timestamp when the command was sent.
+ * If this.command is set, use it; otherwise, ask user for the command.
+ */
+ private synchronized void submitTask() {
+ if (this.isInteractive) {
+ String cmd;
+ try {
+ do {
+ System.out.print("\nRE> ");
+ cmd = this.prompt.readLine();
+ } while (cmd != null && cmd.trim().isEmpty());
+ } catch (final IOException ex) {
+ LOG.log(Level.FINE, "Error reading from stdin: {0}", ex);
+ cmd = null;
+ }
+ if (cmd == null || cmd.equals("exit")) {
+ this.runningJob.close();
+ stopAndNotify();
+ } else {
+ this.submitTask(cmd);
+ }
+ } else {
+ // non-interactive batch mode:
+ this.submitTask(this.command);
+ }
+ }
+
+ /**
+ * Send command to the job driver. Record timestamp when the command was sent.
+ *
+ * @param cmd shell command to execute in all Evaluators.
+ */
+ private synchronized void submitTask(final String cmd) {
+ LOG.log(Level.FINE, "Submit task {0} \"{1}\" to {2}",
+ new Object[]{this.numRuns + 1, cmd, this.runningJob});
+ this.startTime = System.currentTimeMillis();
+ this.runningJob.send(CODEC.encode(cmd));
+ }
+
+ /**
+ * Notify the process in waitForCompletion() method that the main process has finished.
+ */
+ private synchronized void stopAndNotify() {
+ this.runningJob = null;
+ this.isBusy = false;
+ this.notify();
+ }
+
+ /**
+ * Wait for the job driver to complete. This method is called from Launch.main()
+ */
+ public String waitForCompletion() {
+ while (this.isBusy) {
+ LOG.log(Level.FINE, "Waiting for the Job Driver to complete.");
+ try {
+ synchronized (this) {
+ this.wait();
+ }
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
+ }
+ }
+ return this.lastResult;
+ }
+
+ public void close() {
+ this.reef.close();
+ }
+
+ /**
+ * Receive notification from the job driver that the job is running.
+ */
+ final class RunningJobHandler implements EventHandler<RunningJob> {
+ @Override
+ public void onNext(final RunningJob job) {
+ LOG.log(Level.FINE, "Running job: {0}", job.getId());
+ synchronized (JobClient.this) {
+ JobClient.this.runningJob = job;
+ JobClient.this.submitTask();
+ }
+ }
+ }
+
+ /**
+ * Receive message from the job driver.
+ * There is only one message, which comes at the end of the driver execution
+ * and contains shell command output on each node.
+ */
+ final class JobMessageHandler implements EventHandler<JobMessage> {
+ @Override
+ public void onNext(final JobMessage message) {
+ synchronized (JobClient.this) {
+
+ lastResult = CODEC.decode(message.get());
+ final long jobTime = System.currentTimeMillis() - startTime;
+ totalTime += jobTime;
+ ++numRuns;
+
+ LOG.log(Level.FINE, "TIME: Task {0} completed in {1} msec.:\n{2}",
+ new Object[]{"" + numRuns, "" + jobTime, lastResult});
+
+ System.out.println(lastResult);
+
+ if (runningJob != null) {
+ if (isInteractive || numRuns < maxRuns) {
+ submitTask();
+ } else {
+ LOG.log(Level.INFO,
+ "All {0} tasks complete; Average task time: {1}. Closing the job driver.",
+ new Object[]{maxRuns, totalTime / (double) maxRuns});
+ runningJob.close();
+ stopAndNotify();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification from the job driver that the job had failed.
+ */
+ final class FailedJobHandler implements EventHandler<FailedJob> {
+ @Override
+ public void onNext(final FailedJob job) {
+ LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
+ stopAndNotify();
+ }
+ }
+
+ /**
+ * Receive notification from the job driver that the job had completed successfully.
+ */
+ final class CompletedJobHandler implements EventHandler<CompletedJob> {
+ @Override
+ public void onNext(final CompletedJob job) {
+ LOG.log(Level.FINE, "Completed job: {0}", job.getId());
+ stopAndNotify();
+ }
+ }
+
+ /**
+ * Receive notification that there was an exception thrown from the job driver.
+ */
+ final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
+ @Override
+ public void onNext(final FailedRuntime error) {
+ LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getReason().orElse(null));
+ stopAndNotify();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java
new file mode 100644
index 0000000..b2b2055
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/JobDriver.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.retained_eval;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Retained Evaluator example job driver. Execute shell command on all evaluators,
+ * capture stdout, and return concatenated results back to the client.
+ */
+@Unit
+public final class JobDriver {
+ /**
+ * Standard Java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
+
+ /**
+ * Duration of one clock interval.
+ */
+ private static final int CHECK_UP_INTERVAL = 1000; // 1 sec.
+
+ /**
+ * String codec is used to encode the results
+ * before passing them back to the client.
+ */
+ private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+ /**
+ * Job observer on the client.
+ * We use it to send results from the driver back to the client.
+ */
+ private final JobMessageObserver jobMessageObserver;
+ /**
+ * Job driver uses EvaluatorRequestor
+ * to request Evaluators that will run the Tasks.
+ */
+ private final EvaluatorRequestor evaluatorRequestor;
+ /**
+ * Number of Evalutors to request (default is 1).
+ */
+ private final int numEvaluators;
+ /**
+ * Shell execution results from each Evaluator.
+ */
+ private final List<String> results = new ArrayList<>();
+ /**
+ * Map from context ID to running evaluator context.
+ */
+ private final Map<String, ActiveContext> contexts = new HashMap<>();
+ /**
+ * Job driver state.
+ */
+ private State state = State.INIT;
+ /**
+ * First command to execute. Sometimes client can send us the first command
+ * before Evaluators are available; we need to store this command here.
+ */
+ private String cmd;
+ /**
+ * Number of evaluators/tasks to complete.
+ */
+ private int expectCount = 0;
+
+ /**
+ * Job driver constructor.
+ * All parameters are injected from TANG automatically.
+ *
+ * @param jobMessageObserver is used to send messages back to the client.
+ * @param evaluatorRequestor is used to request Evaluators.
+ */
+ @Inject
+ JobDriver(final JobMessageObserver jobMessageObserver,
+ final EvaluatorRequestor evaluatorRequestor,
+ final @Parameter(Launch.NumEval.class) Integer numEvaluators) {
+ this.jobMessageObserver = jobMessageObserver;
+ this.evaluatorRequestor = evaluatorRequestor;
+ this.numEvaluators = numEvaluators;
+ }
+
+ /**
+ * Construct the final result and forward it to the Client.
+ */
+ private void returnResults() {
+ final StringBuilder sb = new StringBuilder();
+ for (final String result : this.results) {
+ sb.append(result);
+ }
+ this.results.clear();
+ LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
+ this.jobMessageObserver.sendMessageToClient(CODEC.encode(sb.toString()));
+ }
+
+ /**
+ * Submit command to all available evaluators.
+ *
+ * @param command shell command to execute.
+ */
+ private void submit(final String command) {
+ LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
+ new Object[]{command, this.contexts.size(), this.state});
+ assert (this.state == State.READY);
+ this.expectCount = this.contexts.size();
+ this.state = State.WAIT_TASKS;
+ this.cmd = null;
+ for (final ActiveContext context : this.contexts.values()) {
+ this.submit(context, command);
+ }
+ }
+
+ /**
+ * Submit a Task that execute the command to a single Evaluator.
+ * This method is called from <code>submitTask(cmd)</code>.
+ */
+ private void submit(final ActiveContext context, final String command) {
+ try {
+ LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context});
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ cb.addConfiguration(
+ TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
+ .set(TaskConfiguration.TASK, ShellTask.class)
+ .build()
+ );
+ cb.bindNamedParameter(Command.class, command);
+ context.submitTask(cb.build());
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
+ context.close();
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Request the evaluators.
+ */
+ private synchronized void requestEvaluators() {
+ assert (this.state == State.INIT);
+ LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators);
+ this.evaluatorRequestor.submit(
+ EvaluatorRequest.newBuilder()
+ .setMemory(128)
+ .setNumberOfCores(1)
+ .setNumber(this.numEvaluators).build()
+ );
+ this.state = State.WAIT_EVALUATORS;
+ this.expectCount = this.numEvaluators;
+ }
+
+ /**
+ * Possible states of the job driver. Can be one of:
+ * <dl>
+ * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd>
+ * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd>
+ * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd>
+ * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd>
+ * </dl>
+ */
+ private enum State {
+ INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
+ }
+
+ /**
+ * Receive notification that an Evaluator had been allocated,
+ * and submitTask a new Task in that Evaluator.
+ */
+ final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator eval) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
+ new Object[]{eval.getId(), JobDriver.this.expectCount, JobDriver.this.contexts.size()});
+ assert (JobDriver.this.state == State.WAIT_EVALUATORS);
+ try {
+ eval.submitContext(ContextConfiguration.CONF.set(
+ ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex);
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the entire Evaluator had failed.
+ * Stop other jobs and pass this error to the job observer on the client.
+ */
+ final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+ @Override
+ public void onNext(final FailedEvaluator eval) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+ for (final FailedContext failedContext : eval.getFailedContextList()) {
+ JobDriver.this.contexts.remove(failedContext.getId());
+ }
+ throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
+ }
+ }
+ }
+
+ /**
+ * Receive notification that a new Context is available.
+ * Submit a new Distributed Shell Task to that Context.
+ */
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext context) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
+ new Object[]{context.getId(), JobDriver.this.expectCount, JobDriver.this.state});
+ assert (JobDriver.this.state == State.WAIT_EVALUATORS);
+ JobDriver.this.contexts.put(context.getId(), context);
+ if (--JobDriver.this.expectCount <= 0) {
+ JobDriver.this.state = State.READY;
+ if (JobDriver.this.cmd == null) {
+ LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
+ JobDriver.this.state);
+ } else {
+ JobDriver.this.submit(JobDriver.this.cmd);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Context had completed.
+ * Remove context from the list of active context.
+ */
+ final class ClosedContextHandler implements EventHandler<ClosedContext> {
+ @Override
+ public void onNext(final ClosedContext context) {
+ LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
+ synchronized (JobDriver.this) {
+ JobDriver.this.contexts.remove(context.getId());
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Context had failed.
+ * Remove context from the list of active context and notify the client.
+ */
+ final class FailedContextHandler implements EventHandler<FailedContext> {
+ @Override
+ public void onNext(final FailedContext context) {
+ LOG.log(Level.SEVERE, "FailedContext", context);
+ synchronized (JobDriver.this) {
+ JobDriver.this.contexts.remove(context.getId());
+ }
+ throw new RuntimeException("Failed context: ", context.asError());
+ }
+ }
+
+ /**
+ * Receive notification that the Task has completed successfully.
+ */
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask task) {
+ LOG.log(Level.INFO, "Completed task: {0}", task.getId());
+ // Take the message returned by the task and add it to the running result.
+ final String result = CODEC.decode(task.get());
+ synchronized (JobDriver.this) {
+ JobDriver.this.results.add(task.getId() + " :: " + result);
+ LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
+ task.getId(), JobDriver.this.results.size(), result, JobDriver.this.state});
+ if (--JobDriver.this.expectCount <= 0) {
+ JobDriver.this.returnResults();
+ JobDriver.this.state = State.READY;
+ if (JobDriver.this.cmd != null) {
+ JobDriver.this.submit(JobDriver.this.cmd);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification from the client.
+ */
+ final class ClientMessageHandler implements EventHandler<byte[]> {
+ @Override
+ public void onNext(final byte[] message) {
+ synchronized (JobDriver.this) {
+ final String command = CODEC.decode(message);
+ LOG.log(Level.INFO, "Client message: {0} state: {1}",
+ new Object[]{command, JobDriver.this.state});
+ assert (JobDriver.this.cmd == null);
+ if (JobDriver.this.state == State.READY) {
+ JobDriver.this.submit(command);
+ } else {
+ // not ready yet - save the command for better times.
+ assert (JobDriver.this.state == State.WAIT_EVALUATORS);
+ JobDriver.this.cmd = command;
+ }
+ }
+ }
+ }
+
+ /**
+ * Job Driver is ready and the clock is set up: request the evaluators.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
+ assert (state == State.INIT);
+ requestEvaluators();
+ }
+ }
+
+ /**
+ * Shutting down the job driver: close the evaluators.
+ */
+ final class StopHandler implements EventHandler<StopTime> {
+ @Override
+ public void onNext(final StopTime time) {
+ LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
+ for (final ActiveContext context : contexts.values()) {
+ context.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/Launch.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/Launch.java
new file mode 100644
index 0000000..cf230bc
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/Launch.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.retained_eval;
+
+import org.apache.reef.client.ClientConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.CommandLine;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Retained Evaluators example - main class.
+ */
+public final class Launch {
+
+ /**
+ * Number of REEF worker threads in local mode.
+ */
+ private static final int NUM_LOCAL_THREADS = 4;
+ /**
+ * Standard Java logger
+ */
+ private static final Logger LOG = Logger.getLogger(Launch.class.getName());
+
+ /**
+ * This class should not be instantiated.
+ */
+ private Launch() {
+ throw new RuntimeException("Do not instantiate this class!");
+ }
+
+ /**
+ * Parse the command line arguments.
+ *
+ * @param args command line arguments, as passed to main()
+ * @return Configuration object.
+ * @throws BindException configuration error.
+ * @throws IOException error reading the configuration.
+ */
+ private static Configuration parseCommandLine(final String[] args)
+ throws BindException, IOException {
+ final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+ final CommandLine cl = new CommandLine(confBuilder);
+ cl.registerShortNameOfClass(Local.class);
+ cl.registerShortNameOfClass(Command.class);
+ cl.registerShortNameOfClass(NumRuns.class);
+ cl.registerShortNameOfClass(NumEval.class);
+ cl.processCommandLine(args);
+ return confBuilder.build();
+ }
+
+ private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
+ throws InjectionException, BindException {
+ final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ cb.bindNamedParameter(Command.class, injector.getNamedInstance(Command.class));
+ cb.bindNamedParameter(NumRuns.class, String.valueOf(injector.getNamedInstance(NumRuns.class)));
+ cb.bindNamedParameter(NumEval.class, String.valueOf(injector.getNamedInstance(NumEval.class)));
+ return cb.build();
+ }
+
+ /**
+ * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+ *
+ * @param args Command line arguments, as passed into main().
+ * @return (immutable) TANG Configuration object.
+ * @throws BindException if configuration commandLineInjector fails.
+ * @throws InjectionException if configuration commandLineInjector fails.
+ * @throws IOException error reading the configuration.
+ */
+ private static Configuration getClientConfiguration(final String[] args)
+ throws BindException, InjectionException, IOException {
+
+ final Configuration commandLineConf = parseCommandLine(args);
+
+ final Configuration clientConfiguration = ClientConfiguration.CONF
+ .set(ClientConfiguration.ON_JOB_RUNNING, JobClient.RunningJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_MESSAGE, JobClient.JobMessageHandler.class)
+ .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
+ .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
+ .build();
+
+ // TODO: Remove the injector, have stuff injected.
+ final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
+ final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
+ final Configuration runtimeConfiguration;
+ if (isLocal) {
+ LOG.log(Level.INFO, "Running on the local runtime");
+ runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+ .build();
+ } else {
+ LOG.log(Level.INFO, "Running on YARN");
+ runtimeConfiguration = YarnClientConfiguration.CONF.build();
+ }
+
+ return Tang.Factory.getTang()
+ .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
+ cloneCommandLineConfiguration(commandLineConf))
+ .build();
+ }
+
+ /**
+ * Main method that starts the Retained Evaluators job.
+ *
+ * @return a string that contains last results from all evaluators.
+ */
+ public static String run(final Configuration config) throws InjectionException {
+ final Injector injector = Tang.Factory.getTang().newInjector(config);
+ final JobClient client = injector.getInstance(JobClient.class);
+ client.submit();
+ return client.waitForCompletion();
+ }
+
+ /**
+ * Main method that starts the Retained Evaluators job.
+ *
+ * @param args command line parameters.
+ */
+ public static void main(final String[] args) {
+ try {
+ final Configuration config = getClientConfiguration(args);
+ LOG.log(Level.FINEST, "Configuration:\n--\n{0}--",
+ new AvroConfigurationSerializer().toString(config));
+ run(config);
+ LOG.info("Done!");
+ } catch (final BindException | InjectionException | IOException ex) {
+ LOG.log(Level.SEVERE, "Job configuration error", ex);
+ }
+ }
+
+ /**
+ * Command line parameter: number of experiments to run.
+ */
+ @NamedParameter(doc = "Number of times to run the command",
+ short_name = "num_runs", default_value = "1")
+ public static final class NumRuns implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter: number of evaluators to allocate.
+ */
+ @NamedParameter(doc = "Number of evaluators to request",
+ short_name = "num_eval", default_value = "1")
+ public static final class NumEval implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter = true to run locally, or false to run on YARN.
+ */
+ @NamedParameter(doc = "Whether or not to run on the local runtime",
+ short_name = "local", default_value = "true")
+ public static final class Local implements Name<Boolean> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/package-info.java
new file mode 100644
index 0000000..842b659
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/retained_eval/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The Retained Evaluators example.
+ */
+package org.apache.reef.examples.retained_eval;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
new file mode 100644
index 0000000..960f297
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The body of Task scheduler. It owns a task queue
+ * and tracks the record of scheduled tasks.
+ */
+@ThreadSafe
+final class Scheduler {
+ /**
+ * Tasks are waiting to be scheduled in the queue.
+ */
+ private final Queue<TaskEntity> taskQueue;
+
+ /**
+ * Lists of {@link TaskEntity} for different states - Running / Finished / Canceled.
+ */
+ private final List<TaskEntity> runningTasks = new ArrayList<>();
+ private final List<TaskEntity> finishedTasks = new ArrayList<>();
+ private final List<TaskEntity> canceledTasks = new ArrayList<>();
+
+ /**
+ * Counts how many tasks have been scheduled.
+ */
+ private final AtomicInteger taskCount = new AtomicInteger(0);
+
+ @Inject
+ public Scheduler() {
+ taskQueue = new LinkedBlockingQueue<>();
+ }
+
+ /**
+ * Submit a task to the ActiveContext.
+ */
+ public synchronized void submitTask(final ActiveContext context) {
+ final TaskEntity task = taskQueue.poll();
+ final Integer taskId = task.getId();
+ final String command = task.getCommand();
+
+ final Configuration taskConf = TaskConfiguration.CONF
+ .set(TaskConfiguration.TASK, ShellTask.class)
+ .set(TaskConfiguration.IDENTIFIER, taskId.toString())
+ .build();
+ final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(Command.class, command)
+ .build();
+
+ final Configuration merged = Configurations.merge(taskConf, commandConf);
+ context.submitTask(merged);
+ runningTasks.add(task);
+ }
+
+ /**
+ * Update the record of task to mark it as canceled.
+ */
+ public synchronized SchedulerResponse cancelTask(final int taskId) {
+ if (getTask(taskId, runningTasks) != null) {
+ return SchedulerResponse.FORBIDDEN("The task " + taskId + " is running");
+ } else if (getTask(taskId, finishedTasks) != null) {
+ return SchedulerResponse.FORBIDDEN("The task " + taskId + " has been finished");
+ }
+
+ final TaskEntity task = getTask(taskId, taskQueue);
+ if (task == null) {
+ final String message = new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString();
+ return SchedulerResponse.NOT_FOUND(message);
+ } else {
+ taskQueue.remove(task);
+ canceledTasks.add(task);
+ return SchedulerResponse.OK("Canceled " + taskId);
+ }
+ }
+
+ /**
+ * Clear the pending list
+ */
+ public synchronized SchedulerResponse clear() {
+ final int count = taskQueue.size();
+ for (final TaskEntity task : taskQueue) {
+ canceledTasks.add(task);
+ }
+ taskQueue.clear();
+ return SchedulerResponse.OK(count + " tasks removed.");
+ }
+
+ /**
+ * Get the list of Tasks, which are grouped by the states.
+ */
+ public synchronized SchedulerResponse getList() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Running :");
+ for (final TaskEntity running : runningTasks) {
+ sb.append(" ").append(running.getId());
+ }
+
+ sb.append("\nWaiting :");
+ for (final TaskEntity waiting : taskQueue) {
+ sb.append(" ").append(waiting.getId());
+ }
+
+ sb.append("\nFinished :");
+ for (final TaskEntity finished : finishedTasks) {
+ sb.append(" ").append(finished.getId());
+ }
+
+ sb.append("\nCanceled :");
+ for (final TaskEntity canceled : canceledTasks) {
+ sb.append(" ").append(canceled.getId());
+ }
+ return SchedulerResponse.OK(sb.toString());
+ }
+
+ /**
+ * Get the status of a Task.
+ */
+ public synchronized SchedulerResponse getTaskStatus(final int taskId) {
+
+ for (final TaskEntity running : runningTasks) {
+ if (taskId == running.getId()) {
+ return SchedulerResponse.OK("Running : " + running.toString());
+ }
+ }
+
+ for (final TaskEntity waiting : taskQueue) {
+ if (taskId == waiting.getId()) {
+ return SchedulerResponse.OK("Waiting : " + waiting.toString());
+ }
+ }
+
+ for (final TaskEntity finished : finishedTasks) {
+ if (taskId == finished.getId()) {
+ return SchedulerResponse.OK("Finished : " + finished.toString());
+ }
+ }
+
+ for (final TaskEntity finished : canceledTasks) {
+ if (taskId == finished.getId()) {
+ return SchedulerResponse.OK("Canceled: " + finished.toString());
+ }
+ }
+ return SchedulerResponse.NOT_FOUND(new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString());
+ }
+
+ /**
+ * Assigns a TaskId to submit.
+ */
+ public synchronized int assignTaskId() {
+ return taskCount.incrementAndGet();
+ }
+
+ /**
+ * Add a task to the queue.
+ */
+ public synchronized void addTask(TaskEntity task) {
+ taskQueue.add(task);
+ }
+
+ /**
+ * Check whether there are tasks waiting to be submitted.
+ */
+ public synchronized boolean hasPendingTasks() {
+ return !taskQueue.isEmpty();
+ }
+
+ /**
+ * Get the number of pending tasks in the queue.
+ */
+ public synchronized int getNumPendingTasks() {
+ return taskQueue.size();
+ }
+
+ /**
+ * Update the record of task to mark it as finished.
+ */
+ public synchronized void setFinished(final int taskId) {
+ final TaskEntity task = getTask(taskId, runningTasks);
+ runningTasks.remove(task);
+ finishedTasks.add(task);
+ }
+
+ /**
+ * Iterate over the collection to find a TaskEntity with ID.
+ */
+ private TaskEntity getTask(final int taskId, final Collection<TaskEntity> tasks) {
+ TaskEntity result = null;
+ for (final TaskEntity task : tasks) {
+ if (taskId == task.getId()) {
+ result = task;
+ break;
+ }
+ }
+ return result;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
new file mode 100644
index 0000000..890d872
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.inject.Inject;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver for TaskScheduler. It receives the commands by HttpRequest and
+ * execute them in a FIFO(First In First Out) order.
+ */
+@Unit
+public final class SchedulerDriver {
+
+ public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+ private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName());
+
+ /**
+ * Possible states of the job driver. Can be one of:
+ * <dl>
+ * <du><code>INIT</code></du><dd>Initial state. Ready to request an evaluator.</dd>
+ * <du><code>WAIT_EVALUATORS</code></du><dd>Waiting for an evaluator allocated with no active evaluators.</dd>
+ * <du><code>READY</code></du><dd>Wait for the commands. Reactivated when a new Task arrives.</dd>
+ * <du><code>RUNNING</code></du><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd>
+ * </dl>
+ */
+ private enum State {
+ INIT, WAIT_EVALUATORS, READY, RUNNING
+ }
+
+ /**
+ * If true, it reuses evaluators when Tasks done.
+ */
+ private boolean retainable;
+
+ @GuardedBy("SchedulerDriver.this")
+ private State state = State.INIT;
+
+ @GuardedBy("SchedulerDriver.this")
+ private Scheduler scheduler;
+
+ @GuardedBy("SchedulerDriver.this")
+ private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0;
+
+ private final EvaluatorRequestor requestor;
+
+ @Inject
+ public SchedulerDriver(final EvaluatorRequestor requestor,
+ @Parameter(SchedulerREEF.Retain.class) boolean retainable,
+ final Scheduler scheduler) {
+ this.requestor = requestor;
+ this.scheduler = scheduler;
+ this.retainable = retainable;
+ }
+
+ /**
+ * The driver is ready to run.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ LOG.log(Level.INFO, "Driver started at {0}", startTime);
+ assert (state == State.INIT);
+ state = State.WAIT_EVALUATORS;
+
+ requestEvaluator(1); // Allocate an initial evaluator to avoid idle state.
+ }
+ }
+
+ /**
+ * Evaluator is allocated. This occurs every time to run commands in Non-retainable version,
+ * while occurs only once in the Retainable version
+ */
+ final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator evaluator) {
+ LOG.log(Level.INFO, "Evaluator is ready");
+ synchronized (SchedulerDriver.this) {
+ nActiveEval++;
+ nRequestedEval--;
+ }
+
+ evaluator.submitContext(ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
+ .build());
+ }
+ }
+
+ /**
+ * Now it is ready to schedule tasks. But if the queue is empty,
+ * wait until commands coming up.
+ *
+ * If there is no pending task, having more than 1 evaluators must be redundant.
+ * It may happen, for example, when tasks are canceled during allocation.
+ * In these cases, the new evaluator may be abandoned.
+ */
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(ActiveContext context) {
+ synchronized (SchedulerDriver.this) {
+ LOG.log(Level.INFO, "Context available : {0}", context.getId());
+
+ if (scheduler.hasPendingTasks()) {
+ state = State.RUNNING;
+ scheduler.submitTask(context);
+ } else if (nActiveEval > 1) {
+ nActiveEval--;
+ context.close();
+ } else {
+ state = State.READY;
+ waitForCommands(context);
+ }
+ }
+ }
+ }
+
+ /**
+ * Non-retainable version of CompletedTaskHandler.
+ * When Task completes, it closes the active context to deallocate the evaluator
+ * and if there is outstanding commands, allocate another evaluator.
+ */
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask task) {
+ final int taskId = Integer.valueOf(task.getId());
+
+ synchronized (SchedulerDriver.this) {
+ scheduler.setFinished(taskId);
+
+ LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable));
+ final ActiveContext context = task.getActiveContext();
+
+ if (retainable) {
+ retainEvaluator(context);
+ } else {
+ reallocateEvaluator(context);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the list of tasks in the scheduler.
+ */
+ public synchronized SchedulerResponse getList() {
+ return scheduler.getList();
+ }
+
+ /**
+ * Clear all the Tasks from the waiting queue.
+ */
+ public synchronized SchedulerResponse clearList() {
+ return scheduler.clear();
+ }
+
+ /**
+ * Get the status of a task.
+ */
+ public SchedulerResponse getTaskStatus(List<String> args) {
+ if (args.size() != 1) {
+ return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
+ }
+
+ final Integer taskId = Integer.valueOf(args.get(0));
+
+ synchronized (SchedulerDriver.this) {
+ return scheduler.getTaskStatus(taskId);
+ }
+ }
+
+ /**
+ * Cancel a Task waiting on the queue. A task cannot be canceled
+ * once it is running.
+ */
+ public SchedulerResponse cancelTask(final List<String> args) {
+ if (args.size() != 1) {
+ return SchedulerResponse.BAD_REQUEST("Usage : only one ID at a time");
+ }
+
+ final Integer taskId = Integer.valueOf(args.get(0));
+
+ synchronized (SchedulerDriver.this) {
+ return scheduler.cancelTask(taskId);
+ }
+ }
+
+ /**
+ * Submit a command to schedule.
+ */
+ public SchedulerResponse submitCommands(final List<String> args) {
+ if (args.size() != 1) {
+ return SchedulerResponse.BAD_REQUEST("Usage : only one command at a time");
+ }
+
+ final String command = args.get(0);
+ final Integer id;
+
+ synchronized (SchedulerDriver.this) {
+ id = scheduler.assignTaskId();
+ scheduler.addTask(new TaskEntity(id, command));
+
+ if (state == State.READY) {
+ SchedulerDriver.this.notify(); // Wake up at {waitForCommands}
+ } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) {
+ requestEvaluator(1);
+ }
+ }
+ return SchedulerResponse.OK("Task ID : " + id);
+ }
+
+ /**
+ * Update the maximum number of evaluators to hold.
+ * Request more evaluators in case there are pending tasks
+ * in the queue and the number of evaluators is less than the limit.
+ */
+ public SchedulerResponse setMaxEvaluators(final List<String> args) {
+ if (args.size() != 1) {
+ return SchedulerResponse.BAD_REQUEST("Usage : Only one value can be used");
+ }
+
+ final int nTarget = Integer.valueOf(args.get(0));
+
+ synchronized (SchedulerDriver.this) {
+ if (nTarget < nActiveEval + nRequestedEval) {
+ return SchedulerResponse.FORBIDDEN(nActiveEval + nRequestedEval +
+ " evaluators are used now. Should be larger than that.");
+ }
+ nMaxEval = nTarget;
+
+ if (scheduler.hasPendingTasks()) {
+ final int nToRequest =
+ Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval;
+ requestEvaluator(nToRequest);
+ }
+ return SchedulerResponse.OK("You can use evaluators up to " + nMaxEval + " evaluators.");
+ }
+ }
+
+ /**
+ * Request evaluators. Passing a non positive number is illegal,
+ * so it does not make a trial for that situation.
+ */
+ private void requestEvaluator(final int numToRequest) {
+ if (numToRequest <= 0) {
+ throw new IllegalArgumentException("The number of evaluator request should be a positive integer");
+ }
+
+ synchronized (SchedulerDriver.this) {
+ nRequestedEval += numToRequest;
+ requestor.submit(EvaluatorRequest.newBuilder()
+ .setMemory(32)
+ .setNumber(numToRequest)
+ .build());
+ }
+ }
+
+ /**
+ * Pick up a command from the queue and run it. Wait until
+ * any command coming up if no command exists.
+ */
+ private void waitForCommands(final ActiveContext context) {
+ synchronized (SchedulerDriver.this) {
+ while (!scheduler.hasPendingTasks()) {
+ // Wait until any command enters in the queue
+ try {
+ SchedulerDriver.this.wait();
+ } catch (InterruptedException e) {
+ LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e);
+ }
+ }
+ // When wakes up, run the first command from the queue.
+ state = State.RUNNING;
+ scheduler.submitTask(context);
+ }
+ }
+
+ /**
+ * Retain the complete evaluators submitting another task
+ * until there is no need to reuse them.
+ */
+ private synchronized void retainEvaluator(final ActiveContext context) {
+ if (scheduler.hasPendingTasks()) {
+ scheduler.submitTask(context);
+ } else if (nActiveEval > 1) {
+ nActiveEval--;
+ context.close();
+ } else {
+ state = State.READY;
+ waitForCommands(context);
+ }
+ }
+
+ /**
+ * Always close the complete evaluators and
+ * allocate a new evaluator if necessary.
+ */
+ private synchronized void reallocateEvaluator(final ActiveContext context) {
+ nActiveEval--;
+ context.close();
+
+ if (scheduler.hasPendingTasks()) {
+ requestEvaluator(1);
+ } else if (nActiveEval <= 0) {
+ state = State.WAIT_EVALUATORS;
+ requestEvaluator(1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
new file mode 100644
index 0000000..c2f6090
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Receive HttpRequest so that it can handle the command list
+ */
+final class SchedulerHttpHandler implements HttpHandler {
+ final InjectionFuture<SchedulerDriver> schedulerDriver;
+
+ private String uriSpecification = "reef-example-scheduler";
+
+ @Inject
+ public SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) {
+ this.schedulerDriver = schedulerDriver;
+ }
+
+ @Override
+ public String getUriSpecification() {
+ return uriSpecification;
+ }
+
+ @Override
+ public void setUriSpecification(String s) {
+ uriSpecification = s;
+ }
+
+ /**
+ * HttpRequest handler. You must specify UriSpecification and REST API version.
+ * The request url is http://{address}:{port}/reef-example-scheduler/v1
+ *
+ * APIs
+ * /list to get the status list for all tasks
+ * /status?id={id} to query the status of such a task, given id
+ * /submit?cmd={cmd} to submit a Task, which returns its id
+ * /cancel?id={id} to cancel the task's execution
+ * /num-eval?num={num} to set the maximum number of evaluators
+ * /clear to clear the waiting queue
+ */
+ @Override
+ public void onHttpRequest(ParsedHttpRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ final String target = request.getTargetEntity().toLowerCase();
+ final Map<String, List<String>> queryMap = request.getQueryMap();
+
+ final SchedulerResponse result;
+ switch (target) {
+ case "list":
+ result = schedulerDriver.get().getList();
+ break;
+ case "clear":
+ result = schedulerDriver.get().clearList();
+ break;
+ case "status":
+ result = schedulerDriver.get().getTaskStatus(queryMap.get("id"));
+ break;
+ case "submit":
+ result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
+ break;
+ case "cancel":
+ result = schedulerDriver.get().cancelTask(queryMap.get("id"));
+ break;
+ case "max-eval":
+ result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num"));
+ break;
+ default:
+ result = SchedulerResponse.NOT_FOUND("Unsupported operation");
+ }
+
+ // Send response to the http client
+ final int status = result.getStatus();
+ final String message= result.getMessage();
+
+ if (result.isOK()) {
+ response.getOutputStream().println(message);
+ } else {
+ response.sendError(status, message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
new file mode 100644
index 0000000..2911905
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
+
+import java.io.IOException;
+
+/**
+ * REEF TaskScheduler.
+ */
+public final class SchedulerREEF {
+
+ /**
+ * Command line parameter = true to reuse evaluators,
+ * or false to allocate/close for each iteration
+ */
+ @NamedParameter(doc = "Whether or not to reuse evaluators",
+ short_name = "retain", default_value = "true")
+ public static final class Retain implements Name<Boolean> {
+ }
+
+ /**
+ * @return The http configuration to use reef-webserver
+ */
+ private final static Configuration getHttpConf() {
+ final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF
+ .set(HttpHandlerConfiguration.HTTP_HANDLERS, SchedulerHttpHandler.class)
+ .build();
+ return httpHandlerConf;
+ }
+
+ /**
+ * @return The Driver configuration.
+ */
+ private final static Configuration getDriverConf() {
+ final Configuration driverConf = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SchedulerDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "TaskScheduler")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, SchedulerDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SchedulerDriver.EvaluatorAllocatedHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SchedulerDriver.ActiveContextHandler.class)
+ .set(DriverConfiguration.ON_TASK_COMPLETED, SchedulerDriver.CompletedTaskHandler.class)
+ .build();
+
+ return driverConf;
+ }
+
+ /**
+ * Run the Task scheduler. If '-retain true' option is passed via command line,
+ * the scheduler reuses evaluators to submit new Tasks.
+ * @param runtimeConf The runtime configuration (e.g. Local, YARN, etc)
+ * @param args Command line arguments.
+ * @throws InjectionException
+ * @throws java.io.IOException
+ */
+ public static void runTaskScheduler(final Configuration runtimeConf, final String[] args)
+ throws InjectionException, IOException, ParseException {
+ final Tang tang = Tang.Factory.getTang();
+
+ final Configuration commandLineConf = CommandLine.parseToConfiguration(args, Retain.class);
+
+ // Merge the configurations to run Driver
+ final Configuration driverConf = Configurations.merge(getDriverConf(), getHttpConf(), commandLineConf);
+
+ final REEF reef = tang.newInjector(runtimeConf).getInstance(REEF.class);
+ reef.submit(driverConf);
+ }
+
+ /**
+ * Main program
+ * @param args
+ * @throws InjectionException
+ */
+ public final static void main(String[] args) throws InjectionException, IOException, ParseException {
+ final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 3)
+ .build();
+ runTaskScheduler(runtimeConfiguration, args);
+ }
+}