You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/10/23 02:03:48 UTC

[17/51] [abbrv] [partial] Initial merge of Wake, Tang and REEF into one repository and project

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendClientControl.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendClientControl.java b/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendClientControl.java
deleted file mode 100644
index 4c71c8b..0000000
--- a/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendClientControl.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.examples.suspend;
-
-import com.microsoft.reef.client.RunningJob;
-import com.microsoft.tang.annotations.Name;
-import com.microsoft.tang.annotations.NamedParameter;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.wake.EStage;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.impl.ThreadPoolStage;
-import com.microsoft.wake.remote.impl.ObjectSerializableCodec;
-import com.microsoft.wake.remote.impl.TransportEvent;
-import com.microsoft.wake.remote.transport.Transport;
-import com.microsoft.wake.remote.transport.netty.NettyMessagingTransport;
-
-import javax.inject.Inject;
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * (Wake) listener to get suspend/resume commands from Control process.
- */
-public class SuspendClientControl implements AutoCloseable {
-
-  @NamedParameter(doc = "Port for suspend/resume control commands",
-      short_name = "port", default_value = "7008")
-  public static final class Port implements Name<Integer> {
-  }
-
-  private static final Logger LOG = Logger.getLogger(Control.class.getName());
-  private static final ObjectSerializableCodec<byte[]> CODEC = new ObjectSerializableCodec<>();
-
-  private transient RunningJob runningJob;
-
-  private final transient Transport transport;
-
-  @Inject
-  public SuspendClientControl(
-      final @Parameter(SuspendClientControl.Port.class) int port) throws IOException {
-
-    LOG.log(Level.INFO, "Listen to control port {0}", port);
-
-    final EStage<TransportEvent> stage = new ThreadPoolStage<>(
-        "suspend-control-server", new ControlMessageHandler(), 1, new EventHandler<Throwable>() {
-      @Override
-      public void onNext(final Throwable throwable) {
-        throw new RuntimeException(throwable);
-      }
-    });
-
-    this.transport = new NettyMessagingTransport("localhost", port, stage, stage, 1, 10000);
-  }
-
-  /**
-   * Forward remote message to the job driver.
-   */
-  private class ControlMessageHandler implements EventHandler<TransportEvent> {
-    @Override
-    public synchronized void onNext(final TransportEvent msg) {
-      LOG.log(Level.INFO, "Control message: {0} destination: {1}",
-              new Object[] { CODEC.decode(msg.getData()), runningJob });
-      if (runningJob != null) {
-        runningJob.send(msg.getData());
-      }
-    }
-  }
-
-  public synchronized void setRunningJob(final RunningJob job) {
-    this.runningJob = job;
-  }
-
-  @Override
-  public void close() throws Exception {
-    this.transport.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendDriver.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendDriver.java b/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendDriver.java
deleted file mode 100644
index c0f85f4..0000000
--- a/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendDriver.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.examples.suspend;
-
-import com.microsoft.reef.driver.client.JobMessageObserver;
-import com.microsoft.reef.driver.context.ActiveContext;
-import com.microsoft.reef.driver.context.ContextConfiguration;
-import com.microsoft.reef.driver.evaluator.AllocatedEvaluator;
-import com.microsoft.reef.driver.evaluator.EvaluatorDescriptor;
-import com.microsoft.reef.driver.evaluator.EvaluatorRequest;
-import com.microsoft.reef.driver.evaluator.EvaluatorRequestor;
-import com.microsoft.reef.driver.task.*;
-import com.microsoft.reef.io.checkpoint.fs.FSCheckPointServiceConfiguration;
-import com.microsoft.tang.Configuration;
-import com.microsoft.tang.JavaConfigurationBuilder;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.tang.annotations.Unit;
-import com.microsoft.tang.exceptions.BindException;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.remote.impl.ObjectSerializableCodec;
-import com.microsoft.wake.time.event.StartTime;
-import com.microsoft.wake.time.event.StopTime;
-
-import javax.inject.Inject;
-import javax.xml.bind.DatatypeConverter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Suspend/resume example job driver. Execute a simple task in all evaluators,
- * and sendEvaluatorControlMessage suspend/resume events properly.
- */
-@Unit
-public class SuspendDriver {
-
-  /**
-   * Standard Java logger.
-   */
-  private static final Logger LOG = Logger.getLogger(SuspendDriver.class.getName());
-
-  /** Number of evaluators to request */
-  private static final int NUM_EVALUATORS = 2;
-
-  /**
-   * String codec is used to encode the results driver sends to the client.
-   */
-  private static final ObjectSerializableCodec<String> CODEC_STR = new ObjectSerializableCodec<>();
-
-  /**
-   * Integer codec is used to decode the results driver gets from the tasks.
-   */
-  private static final ObjectSerializableCodec<Integer> CODEC_INT = new ObjectSerializableCodec<>();
-
-  /**
-   * Job observer on the client.
-   * We use it to send results from the driver back to the client.
-   */
-  private final JobMessageObserver jobMessageObserver;
-
-  /**
-   * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
-   */
-  private final EvaluatorRequestor evaluatorRequestor;
-
-  /**
-   * TANG Configuration of the Task.
-   */
-  private final Configuration contextConfig;
-
-  /**
-   * Map from task ID (a string) to the TaskRuntime instance (that can be suspended).
-   */
-  private final Map<String, RunningTask> runningTasks =
-      Collections.synchronizedMap(new HashMap<String, RunningTask>());
-
-  /**
-   * Map from task ID (a string) to the SuspendedTask instance (that can be resumed).
-   */
-  private final Map<String, SuspendedTask> suspendedTasks = new HashMap<>();
-
-  /**
-   * Job driver constructor.
-   * All parameters are injected from TANG automatically.
-   *
-   * @param evaluatorRequestor is used to request Evaluators.
-   * @param numCycles number of cycles to run in the task.
-   * @param delay delay in seconds between cycles in the task.
-   */
-  @Inject
-  SuspendDriver(
-      final JobMessageObserver jobMessageObserver,
-      final EvaluatorRequestor evaluatorRequestor,
-      final @Parameter(Launch.Local.class) boolean isLocal,
-      final @Parameter(Launch.NumCycles.class) int numCycles,
-      final @Parameter(Launch.Delay.class) int delay) {
-
-    this.jobMessageObserver = jobMessageObserver;
-    this.evaluatorRequestor = evaluatorRequestor;
-
-    try {
-
-      final Configuration checkpointServiceConfig = FSCheckPointServiceConfiguration.CONF
-          .set(FSCheckPointServiceConfiguration.IS_LOCAL, Boolean.toString(isLocal))
-          .set(FSCheckPointServiceConfiguration.PATH, "/tmp")
-          .set(FSCheckPointServiceConfiguration.PREFIX, "reef-checkpoint-")
-          .set(FSCheckPointServiceConfiguration.REPLICATION_FACTOR, "3")
-          .build();
-
-      final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
-          .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles))
-          .bindNamedParameter(Launch.Delay.class, Integer.toString(delay));
-
-      cb.addConfiguration(checkpointServiceConfig);
-      this.contextConfig = cb.build();
-
-    } catch (final BindException ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  /**
-   * Receive notification that the Task is ready to run.
-   */
-  final class RunningTaskHandler implements EventHandler<RunningTask> {
-    @Override
-    public final void onNext(final RunningTask task) {
-      LOG.log(Level.INFO, "Running task: {0}", task.getId());
-      runningTasks.put(task.getId(), task);
-      jobMessageObserver.sendMessageToClient(CODEC_STR.encode("start task: " + task.getId()));
-    }
-  }
-
-  /**
-   * Receive notification that the Task has completed successfully.
-   */
-  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
-    @Override
-    public final void onNext(final CompletedTask task) {
-
-      final EvaluatorDescriptor e = task.getActiveContext().getEvaluatorDescriptor();
-      final String msg = "Task completed " + task.getId() + " on node " + e;
-      LOG.info(msg);
-
-      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
-      runningTasks.remove(task.getId());
-      task.getActiveContext().close();
-
-      final boolean noTasks;
-
-      synchronized (suspendedTasks) {
-        LOG.log(Level.INFO, "Tasks running: {0} suspended: {1}", new Object[] {
-            runningTasks.size(), suspendedTasks.size() });
-        noTasks = runningTasks.isEmpty() && suspendedTasks.isEmpty();
-      }
-
-      if (noTasks) {
-        LOG.info("All tasks completed; shutting down.");
-      }
-    }
-  }
-
-  /**
-   * Receive notification that the Task has been suspended.
-   */
-  final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
-    @Override
-    public final void onNext(final SuspendedTask task) {
-
-      final String msg = "Task suspended: " + task.getId();
-      LOG.info(msg);
-
-      synchronized (suspendedTasks) {
-        suspendedTasks.put(task.getId(), task);
-        runningTasks.remove(task.getId());
-      }
-
-      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
-    }
-  }
-
-  /**
-   * Receive message from the Task.
-   */
-  final class TaskMessageHandler implements EventHandler<TaskMessage> {
-    @Override
-    public void onNext(final TaskMessage message) {
-      final int result = CODEC_INT.decode(message.get());
-      final String msg = "Task message " + message.getId() + ": " + result;
-      LOG.info(msg);
-      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
-    }
-  }
-
-  /**
-   * Receive notification that an Evaluator had been allocated,
-   * and submitTask a new Task in that Evaluator.
-   */
-  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
-    @Override
-    public void onNext(final AllocatedEvaluator eval) {
-      try {
-
-        LOG.log(Level.INFO, "Allocated Evaluator: {0}", eval.getId());
-
-        final Configuration thisContextConfiguration = ContextConfiguration.CONF.set(
-            ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build();
-
-        eval.submitContext(Tang.Factory.getTang()
-            .newConfigurationBuilder(thisContextConfiguration, contextConfig).build());
-
-      } catch (final BindException ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-  }
-
-  /**
-   * Receive notification that a new Context is available.
-   * Submit a new Task to that Context.
-   */
-  final class ActiveContextHandler implements EventHandler<ActiveContext> {
-    @Override
-    public synchronized void onNext(final ActiveContext context) {
-      LOG.log(Level.INFO, "Active Context: {0}", context.getId());
-      try {
-        context.submitTask(TaskConfiguration.CONF
-            .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
-            .set(TaskConfiguration.TASK, SuspendTestTask.class)
-            .set(TaskConfiguration.ON_SUSPEND, SuspendTestTask.SuspendHandler.class)
-            .build());
-      } catch (final BindException ex) {
-        LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
-        throw new RuntimeException(ex);
-      }
-    }
-  }
-
-  /**
-   * Handle notifications from the client.
-   */
-  final class ClientMessageHandler implements EventHandler<byte[]> {
-    @Override
-    public void onNext(final byte[] message) {
-
-      final String commandStr = CODEC_STR.decode(message);
-      LOG.log(Level.INFO, "Client message: {0}", commandStr);
-
-      final String[] split = commandStr.split("\\s+", 2);
-      if (split.length != 2) {
-        throw new IllegalArgumentException("Bad command: " + commandStr);
-      } else {
-
-        final String command = split[0].toLowerCase().intern();
-        final String taskId = split[1];
-
-        switch (command) {
-
-          case "suspend": {
-            final RunningTask task = runningTasks.get(taskId);
-            if (task != null) {
-              task.suspend();
-            } else {
-              throw new IllegalArgumentException("Suspend: Task not found: " + taskId);
-            }
-            break;
-          }
-
-          case "resume": {
-            final SuspendedTask suspendedTask;
-            synchronized (suspendedTasks) {
-              suspendedTask = suspendedTasks.remove(taskId);
-            }
-            if (suspendedTask != null) {
-              try {
-                suspendedTask.getActiveContext().submitTask(TaskConfiguration.CONF
-                    .set(TaskConfiguration.IDENTIFIER, taskId)
-                    .set(TaskConfiguration.MEMENTO,
-                        DatatypeConverter.printBase64Binary(suspendedTask.get()))
-                    .build());
-              } catch (final BindException e) {
-                throw new RuntimeException(e);
-              }
-            } else {
-              throw new IllegalArgumentException("Resume: Task not found: " + taskId);
-            }
-            break;
-          }
-
-          default:
-            throw new IllegalArgumentException("Bad command: " + command);
-        }
-      }
-    }
-  }
-
-  /**
-   * Job Driver is ready and the clock is set up: request the evaluators.
-   */
-  final class StartHandler implements EventHandler<StartTime> {
-    @Override
-    public void onNext(final StartTime time) {
-      LOG.log(Level.INFO, "StartTime: {0}", time);
-      evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
-          .setMemory(128).setNumberOfCores(1).setNumber(NUM_EVALUATORS).build());
-    }
-  }
-
-  /**
-   * Shutting down the job driver: close the evaluators.
-   */
-  final class StopHandler implements EventHandler<StopTime> {
-    @Override
-    public void onNext(final StopTime time) {
-      LOG.log(Level.INFO, "StopTime: {0}", time);
-      jobMessageObserver.sendMessageToClient(CODEC_STR.encode("got StopTime"));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendTestTask.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendTestTask.java b/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendTestTask.java
deleted file mode 100644
index 82ef80b..0000000
--- a/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/SuspendTestTask.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.examples.suspend;
-
-import com.microsoft.reef.task.Task;
-import com.microsoft.reef.task.TaskMessage;
-import com.microsoft.reef.task.TaskMessageSource;
-import com.microsoft.reef.task.events.SuspendEvent;
-import com.microsoft.reef.io.checkpoint.CheckpointID;
-import com.microsoft.reef.io.checkpoint.CheckpointService;
-import com.microsoft.reef.io.checkpoint.CheckpointService.CheckpointReadChannel;
-import com.microsoft.reef.io.checkpoint.CheckpointService.CheckpointWriteChannel;
-import com.microsoft.reef.io.checkpoint.fs.FSCheckpointID;
-import com.microsoft.reef.util.Optional;
-import com.microsoft.tang.annotations.Parameter;
-import com.microsoft.tang.annotations.Unit;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.remote.impl.ObjectSerializableCodec;
-
-import javax.inject.Inject;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Simple do-nothing task that can send messages to the Driver and can be suspended/resumed.
- */
-@Unit
-public class SuspendTestTask implements Task, TaskMessageSource {
-
-  /**
-   * Standard java logger.
-   */
-  private static final Logger LOG = Logger.getLogger(SuspendTestTask.class.getName());
-  private final CheckpointService checkpointService;
-  /**
-   * number of cycles to run in the task.
-   */
-  private final int numCycles;
-  /**
-   * delay in milliseconds between cycles in the task.
-   */
-  private final int delay;
-  /**
-   * Codec to serialize/deserialize counter values for the updates.
-   */
-  private final ObjectSerializableCodec<Integer> codecInt = new ObjectSerializableCodec<>();
-  /**
-   * Codec to serialize/deserialize checkpoint IDs for suspend/resume.
-   */
-  private final ObjectWritableCodec<CheckpointID> codecCheckpoint =
-          new ObjectWritableCodec<CheckpointID>(FSCheckpointID.class);
-  /**
-   * Current value of the counter.
-   */
-  private int counter = 0;
-  /**
-   * True if the suspend message has been received, false otherwise.
-   */
-  private boolean suspended = false;
-
-  /**
-   * Task constructor: invoked by TANG.
-   *
-   * @param numCycles number of cycles to run in the task.
-   * @param delay     delay in seconds between cycles in the task.
-   */
-  @Inject
-  public SuspendTestTask(
-      final CheckpointService checkpointService,
-      @Parameter(Launch.NumCycles.class) final int numCycles,
-      @Parameter(Launch.Delay.class) final int delay) {
-    this.checkpointService = checkpointService;
-    this.numCycles = numCycles;
-    this.delay = delay * 1000;
-  }
-
-  /**
-   * Main method of the task: run cycle from 0 to numCycles,
-   * and sleep for delay seconds on each cycle.
-   *
-   * @param memento serialized version of the counter.
-   *                Empty array for initial run, but can contain value for resumed job.
-   * @return serialized version of the counter.
-   */
-  @Override
-  public synchronized byte[] call(final byte[] memento) throws IOException, InterruptedException {
-
-    LOG.log(Level.INFO, "Start: {0} counter: {1}/{2}",
-            new Object[]{this, this.counter, this.numCycles});
-
-    if (memento != null && memento.length > 0) {
-      this.restore(memento);
-    }
-
-    this.suspended = false;
-    for (; this.counter < this.numCycles && !this.suspended; ++this.counter) {
-      try {
-        LOG.log(Level.INFO, "Run: {0} counter: {1}/{2} sleep: {3}",
-                new Object[]{this, this.counter, this.numCycles, this.delay});
-        this.wait(this.delay);
-      } catch (final InterruptedException ex) {
-        LOG.log(Level.INFO, "{0} interrupted. counter: {1}: {2}",
-                new Object[]{this, this.counter, ex});
-      }
-    }
-
-    return this.suspended ? this.save() : this.codecInt.encode(this.counter);
-  }
-
-  /**
-   * Update driver on current state of the task.
-   *
-   * @return serialized version of the counter.
-   */
-  @Override
-  public synchronized Optional<TaskMessage> getMessage() {
-    LOG.log(Level.INFO, "Message from Task {0} to the Driver: counter: {1}",
-            new Object[]{this, this.counter});
-    return Optional.of(TaskMessage.from(SuspendTestTask.class.getName(), this.codecInt.encode(this.counter)));
-  }
-
-  /**
-   * Save current state of the task in the checkpoint.
-   *
-   * @return checkpoint ID (serialized)
-   */
-  private synchronized byte[] save() throws IOException, InterruptedException {
-    try (final CheckpointWriteChannel channel = this.checkpointService.create()) {
-      channel.write(ByteBuffer.wrap(this.codecInt.encode(this.counter)));
-      return this.codecCheckpoint.encode(this.checkpointService.commit(channel));
-    }
-  }
-
-  /**
-   * Restore the task state from the given checkpoint.
-   *
-   * @param memento serialized checkpoint ID
-   */
-  private synchronized void restore(final byte[] memento) throws IOException, InterruptedException {
-    final CheckpointID checkpointId = this.codecCheckpoint.decode(memento);
-    try (final CheckpointReadChannel channel = this.checkpointService.open(checkpointId)) {
-      final ByteBuffer buffer = ByteBuffer.wrap(this.codecInt.encode(this.counter));
-      channel.read(buffer);
-      this.counter = this.codecInt.decode(buffer.array());
-    }
-    this.checkpointService.delete(checkpointId);
-  }
-
-  public class SuspendHandler implements EventHandler<SuspendEvent> {
-
-    @Override
-    public void onNext(SuspendEvent suspendEvent) {
-      final byte[] message = suspendEvent.get().get();
-      LOG.log(Level.INFO, "Suspend: {0} with: {1} bytes; counter: {2}",
-              new Object[]{this, message.length, SuspendTestTask.this.counter});
-      SuspendTestTask.this.suspended = true;
-      SuspendTestTask.this.notify();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/package-info.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/package-info.java b/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/package-info.java
deleted file mode 100644
index f020b2e..0000000
--- a/reef-examples/src/main/java/com/microsoft/reef/examples/suspend/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * suspend/resume demo.
- */
-package com.microsoft.reef.examples.suspend;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/com/microsoft/reef/examples/utils/wake/BlockingEventHandler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/com/microsoft/reef/examples/utils/wake/BlockingEventHandler.java b/reef-examples/src/main/java/com/microsoft/reef/examples/utils/wake/BlockingEventHandler.java
deleted file mode 100644
index 469a5f0..0000000
--- a/reef-examples/src/main/java/com/microsoft/reef/examples/utils/wake/BlockingEventHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.examples.utils.wake;
-
-import com.microsoft.wake.EventHandler;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An EventHandler that blocks until a set number of Events has been received.
- * Once they have been received, the downstream event handler is called with an
- * Iterable of the events spooled.
- *
- * @param <T>
- */
-public final class BlockingEventHandler<T> implements EventHandler<T> {
-
-  private final int expectedSize;
-  private List<T> events = new ArrayList<>();
-  private final EventHandler<Iterable<T>> destination;
-
-  public BlockingEventHandler(final int expectedSize, final EventHandler<Iterable<T>> destination) {
-    this.expectedSize = expectedSize;
-    this.destination = destination;
-  }
-
-  @Override
-  public final void onNext(final T event) {
-    if (this.isComplete()) {
-      throw new IllegalStateException("Received more Events than expected");
-    }
-    this.events.add(event);
-    if (this.isComplete()) {
-      this.destination.onNext(events);
-      this.reset();
-    }
-  }
-
-  private boolean isComplete() {
-    return this.events.size() >= expectedSize;
-  }
-
-  private void reset() {
-    this.events = new ArrayList<>();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/com/microsoft/reef/examples/utils/wake/LoggingEventHandler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/com/microsoft/reef/examples/utils/wake/LoggingEventHandler.java b/reef-examples/src/main/java/com/microsoft/reef/examples/utils/wake/LoggingEventHandler.java
deleted file mode 100644
index 30647cf..0000000
--- a/reef-examples/src/main/java/com/microsoft/reef/examples/utils/wake/LoggingEventHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.examples.utils.wake;
-
-import com.microsoft.wake.EventHandler;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * An EventHandler that logs its events before handing it to a downstream
- * EventHandler.
- *
- * @param <T>
- */
-public class LoggingEventHandler<T> implements EventHandler<T> {
-
-  private final EventHandler<T> downstreamEventHandler;
-  private final String prefix;
-  private final String suffix;
-
-  /**
-   * @param prefix                 to be logged before the event
-   * @param downstreamEventHandler the event handler to hand the event to
-   * @param suffix                 to be logged after the event
-   */
-  public LoggingEventHandler(final String prefix, EventHandler<T> downstreamEventHandler, final String suffix) {
-    this.downstreamEventHandler = downstreamEventHandler;
-    this.prefix = prefix;
-    this.suffix = suffix;
-  }
-
-  public LoggingEventHandler(final EventHandler<T> downstreamEventHandler) {
-    this("", downstreamEventHandler, "");
-  }
-
-  @Override
-  public void onNext(final T value) {
-    Logger.getLogger(LoggingEventHandler.class.getName()).log(Level.INFO, prefix + value.toString() + suffix);
-    this.downstreamEventHandler.onNext(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/data/loading/DataLoadingREEF.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/data/loading/DataLoadingREEF.java b/reef-examples/src/main/java/org/apache/reef/examples/data/loading/DataLoadingREEF.java
new file mode 100644
index 0000000..c1026ea
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/data/loading/DataLoadingREEF.java
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.examples.data.loading;
+
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Client for the data loading demo app
+ */
+@ClientSide
+public class DataLoadingREEF {
+
+  private static final Logger LOG = Logger.getLogger(DataLoadingREEF.class.getName());
+
+  private static final int NUM_LOCAL_THREADS = 16;
+  private static final int NUM_SPLITS = 6;
+  private static final int NUM_COMPUTE_EVALUATORS = 2;
+
+  public static void main(final String[] args)
+      throws InjectionException, BindException, IOException {
+
+    final Tang tang = Tang.Factory.getTang();
+
+    final JavaConfigurationBuilder cb = tang.newConfigurationBuilder();
+
+    new CommandLine(cb)
+        .registerShortNameOfClass(Local.class)
+        .registerShortNameOfClass(TimeOut.class)
+        .registerShortNameOfClass(DataLoadingREEF.InputDir.class)
+        .processCommandLine(args);
+
+    final Injector injector = tang.newInjector(cb.build());
+
+    final boolean isLocal = injector.getNamedInstance(Local.class);
+    final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
+    final String inputDir = injector.getNamedInstance(DataLoadingREEF.InputDir.class);
+
+    final Configuration runtimeConfiguration;
+    if (isLocal) {
+      LOG.log(Level.INFO, "Running Data Loading demo on the local runtime");
+      runtimeConfiguration = LocalRuntimeConfiguration.CONF
+          .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+          .build();
+    } else {
+      LOG.log(Level.INFO, "Running Data Loading demo on YARN");
+      runtimeConfiguration = YarnClientConfiguration.CONF.build();
+    }
+
+    final EvaluatorRequest computeRequest = EvaluatorRequest.newBuilder()
+        .setNumber(NUM_COMPUTE_EVALUATORS)
+        .setMemory(512)
+        .setNumberOfCores(1)
+        .build();
+
+    final Configuration dataLoadConfiguration = new DataLoadingRequestBuilder()
+        .setMemoryMB(1024)
+        .setInputFormatClass(TextInputFormat.class)
+        .setInputPath(inputDir)
+        .setNumberOfDesiredSplits(NUM_SPLITS)
+        .setComputeRequest(computeRequest)
+        .setDriverConfigurationModule(DriverConfiguration.CONF
+            .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(LineCounter.class))
+            .set(DriverConfiguration.ON_CONTEXT_ACTIVE, LineCounter.ContextActiveHandler.class)
+            .set(DriverConfiguration.ON_TASK_COMPLETED, LineCounter.TaskCompletedHandler.class)
+            .set(DriverConfiguration.DRIVER_IDENTIFIER, "DataLoadingREEF"))
+        .build();
+
+    final LauncherStatus state =
+        DriverLauncher.getLauncher(runtimeConfiguration).run(dataLoadConfiguration, jobTimeout);
+
+    LOG.log(Level.INFO, "REEF job completed: {0}", state);
+  }
+
+  /**
+   * Command line parameter = true to run locally, or false to run on YARN.
+   */
+  @NamedParameter(doc = "Whether or not to run on the local runtime",
+      short_name = "local", default_value = "true")
+  public static final class Local implements Name<Boolean> {
+  }
+
+  @NamedParameter(doc = "Number of minutes before timeout",
+      short_name = "timeout", default_value = "2")
+  public static final class TimeOut implements Name<Integer> {
+  }
+
+  @NamedParameter(short_name = "input")
+  public static final class InputDir implements Name<String> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCounter.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCounter.java b/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCounter.java
new file mode 100644
index 0000000..d2303d5
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCounter.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.examples.data.loading;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.io.data.loading.api.DataLoadingService;
+import org.apache.reef.poison.PoisonedConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver side for the line counting demo that uses the data loading service.
+ */
+@DriverSide
+@Unit
+public class LineCounter {
+
+  private static final Logger LOG = Logger.getLogger(LineCounter.class.getName());
+
+  private final AtomicInteger ctrlCtxIds = new AtomicInteger();
+  private final AtomicInteger lineCnt = new AtomicInteger();
+  private final AtomicInteger completedDataTasks = new AtomicInteger();
+
+  private final DataLoadingService dataLoadingService;
+
+  @Inject
+  public LineCounter(final DataLoadingService dataLoadingService) {
+    this.dataLoadingService = dataLoadingService;
+    this.completedDataTasks.set(dataLoadingService.getNumberOfPartitions());
+  }
+
+  public class ContextActiveHandler implements EventHandler<ActiveContext> {
+
+    @Override
+    public void onNext(final ActiveContext activeContext) {
+
+      final String contextId = activeContext.getId();
+      LOG.log(Level.FINER, "Context active: {0}", contextId);
+
+      if (dataLoadingService.isDataLoadedContext(activeContext)) {
+
+        final String lcContextId = "LineCountCtxt-" + ctrlCtxIds.getAndIncrement();
+        LOG.log(Level.FINEST, "Submit LineCount context {0} to: {1}",
+            new Object[]{lcContextId, contextId});
+
+        final Configuration poisonedConfiguration = PoisonedConfiguration.CONTEXT_CONF
+            .set(PoisonedConfiguration.CRASH_PROBABILITY, "0.4")
+            .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
+            .build();
+
+        activeContext.submitContext(Tang.Factory.getTang()
+            .newConfigurationBuilder(poisonedConfiguration,
+                ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, lcContextId).build())
+            .build());
+
+      } else if (activeContext.getId().startsWith("LineCountCtxt")) {
+
+        final String taskId = "LineCountTask-" + ctrlCtxIds.getAndIncrement();
+        LOG.log(Level.FINEST, "Submit LineCount task {0} to: {1}", new Object[]{taskId, contextId});
+
+        try {
+          activeContext.submitTask(TaskConfiguration.CONF
+              .set(TaskConfiguration.IDENTIFIER, taskId)
+              .set(TaskConfiguration.TASK, LineCountingTask.class)
+              .build());
+        } catch (final BindException ex) {
+          LOG.log(Level.SEVERE, "Configuration error in " + contextId, ex);
+          throw new RuntimeException("Configuration error in " + contextId, ex);
+        }
+      } else {
+        LOG.log(Level.FINEST, "Line count Compute Task {0} -- Closing", contextId);
+        activeContext.close();
+      }
+    }
+  }
+
+  public class TaskCompletedHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask completedTask) {
+
+      final String taskId = completedTask.getId();
+      LOG.log(Level.FINEST, "Completed Task: {0}", taskId);
+
+      final byte[] retBytes = completedTask.get();
+      final String retStr = retBytes == null ? "No RetVal" : new String(retBytes);
+      LOG.log(Level.FINE, "Line count from {0} : {1}", new String[]{taskId, retStr});
+
+      lineCnt.addAndGet(Integer.parseInt(retStr));
+
+      if (completedDataTasks.decrementAndGet() <= 0) {
+        LOG.log(Level.INFO, "Total line count: {0}", lineCnt.get());
+      }
+
+      LOG.log(Level.FINEST, "Releasing Context: {0}", taskId);
+      completedTask.getActiveContext().close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCountingTask.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCountingTask.java b/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCountingTask.java
new file mode 100644
index 0000000..1510d13
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/data/loading/LineCountingTask.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.examples.data.loading;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.io.network.util.Pair;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The task that iterates over the data set to count the number of records.
+ * Assumes TextInputFormat and that records represent lines.
+ */
+@TaskSide
+public class LineCountingTask implements Task {
+
+  private static final Logger LOG = Logger.getLogger(LineCountingTask.class.getName());
+
+  private final DataSet<LongWritable, Text> dataSet;
+
+  @Inject
+  public LineCountingTask(final DataSet<LongWritable, Text> dataSet) {
+    this.dataSet = dataSet;
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) throws Exception {
+    LOG.log(Level.FINER, "LineCounting task started");
+    int numEx = 0;
+    for (final Pair<LongWritable, Text> keyValue : dataSet) {
+      // LOG.log(Level.FINEST, "Read line: {0}", keyValue);
+      ++numEx;
+    }
+    LOG.log(Level.FINER, "LineCounting task finished: read {0} lines", numEx);
+    return Integer.toString(numEx).getBytes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriver.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriver.java b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriver.java
new file mode 100644
index 0000000..72961ea
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloDriver.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Driver code for the Hello REEF Application
+ */
+@Unit
+public final class HelloDriver {
+
+  private static final Logger LOG = Logger.getLogger(HelloDriver.class.getName());
+
+  private final EvaluatorRequestor requestor;
+
+  /**
+   * Job driver constructor - instantiated via TANG.
+   *
+   * @param requestor evaluator requestor object used to create new evaluator containers.
+   */
+  @Inject
+  public HelloDriver(final EvaluatorRequestor requestor) {
+    this.requestor = requestor;
+    LOG.log(Level.FINE, "Instantiated 'HelloDriver'");
+  }
+
+  /**
+   * Handles the StartTime event: Request as single Evaluator.
+   */
+  public final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      HelloDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(1)
+          .setMemory(64)
+          .setNumberOfCores(1)
+          .build());
+      LOG.log(Level.INFO, "Requested Evaluator.");
+    }
+  }
+
+  /**
+   * Handles AllocatedEvaluator: Submit the HelloTask
+   */
+  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      LOG.log(Level.INFO, "Submitting HelloREEF task to AllocatedEvaluator: {0}", allocatedEvaluator);
+      final Configuration taskConfiguration = TaskConfiguration.CONF
+          .set(TaskConfiguration.IDENTIFIER, "HelloREEFTask")
+          .set(TaskConfiguration.TASK, HelloTask.class)
+          .build();
+      allocatedEvaluator.submitTask(taskConfiguration);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEF.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEF.java b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEF.java
new file mode 100644
index 0000000..6aab539
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEF.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Client for Hello REEF example.
+ */
+public final class HelloREEF {
+
+  private static final Logger LOG = Logger.getLogger(HelloREEF.class.getName());
+
+  /**
+   * Number of milliseconds to wait for the job to complete.
+   */
+  private static final int JOB_TIMEOUT = 10000; // 10 sec.
+
+  /**
+   * @return the configuration of the HelloREEF driver.
+   */
+  public static Configuration getDriverConfiguration() {
+    return DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(HelloDriver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+        .build();
+  }
+
+  public static LauncherStatus runHelloReef(final Configuration runtimeConf, final int timeOut)
+      throws BindException, InjectionException {
+    final Configuration driverConf = getDriverConfiguration();
+    return DriverLauncher.getLauncher(runtimeConf).run(driverConf, timeOut);
+  }
+
+  /**
+   * Start Hello REEF job. Runs method runHelloReef().
+   *
+   * @param args command line parameters.
+   * @throws BindException      configuration error.
+   * @throws InjectionException configuration error.
+   */
+  public static void main(final String[] args) throws BindException, InjectionException {
+    final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+        .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 2)
+        .build();
+    final LauncherStatus status = runHelloReef(runtimeConfiguration, JOB_TIMEOUT);
+    LOG.log(Level.INFO, "REEF job completed: {0}", status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFNoClient.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFNoClient.java b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFNoClient.java
new file mode 100644
index 0000000..c212c1f
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFNoClient.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A main() for running hello REEF without a persistent client connection.
+ */
+public final class HelloREEFNoClient {
+
+  private static final Logger LOG = Logger.getLogger(HelloREEFNoClient.class.getName());
+
+  public static void runHelloReefWithoutClient(
+      final Configuration runtimeConf) throws InjectionException {
+
+    final REEF reef = Tang.Factory.getTang().newInjector(runtimeConf).getInstance(REEF.class);
+
+    final Configuration driverConf = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(HelloDriver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+        .build();
+
+    reef.submit(driverConf);
+  }
+
+  public static void main(final String[] args) throws BindException, InjectionException {
+
+    final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+        .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 2)
+        .build();
+
+    runHelloReefWithoutClient(runtimeConfiguration);
+    LOG.log(Level.INFO, "Job Submitted");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefYarn.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefYarn.java b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefYarn.java
new file mode 100644
index 0000000..0f19ae8
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloReefYarn.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Client for Hello REEF example.
+ */
+public final class HelloReefYarn {
+
+  private static final Logger LOG = Logger.getLogger(HelloReefYarn.class.getName());
+
+  /**
+   * Number of milliseconds to wait for the job to complete.
+   */
+  private static final int JOB_TIMEOUT = 30000; // 30 sec.
+
+
+  /**
+   * @return the configuration of the HelloREEF driver.
+   */
+  private static Configuration getDriverConfiguration() {
+    return DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, HelloReefYarn.class.getProtectionDomain().getCodeSource().getLocation().getFile())
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+        .build();
+  }
+
+  /**
+   * Start Hello REEF job. Runs method runHelloReef().
+   *
+   * @param args command line parameters.
+   * @throws org.apache.reef.tang.exceptions.BindException      configuration error.
+   * @throws org.apache.reef.tang.exceptions.InjectionException configuration error.
+   */
+  public static void main(final String[] args) throws InjectionException {
+
+    final LauncherStatus status = DriverLauncher
+        .getLauncher(YarnClientConfiguration.CONF.build())
+        .run(getDriverConfiguration(), JOB_TIMEOUT);
+    LOG.log(Level.INFO, "REEF job completed: {0}", status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloTask.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloTask.java b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloTask.java
new file mode 100644
index 0000000..00351e2
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloTask.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * A 'hello REEF' Task.
+ */
+public final class HelloTask implements Task {
+
+  @Inject
+  HelloTask() {
+  }
+
+  @Override
+  public final byte[] call(final byte[] memento) {
+    System.out.println("Hello, REEF!");
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hello/package-info.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hello/package-info.java b/reef-examples/src/main/java/org/apache/reef/examples/hello/package-info.java
new file mode 100644
index 0000000..5966a82
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hello/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * The Hello REEF example.
+ */
+package org.apache.reef.examples.hello;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/Command.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/Command.java b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/Command.java
new file mode 100644
index 0000000..ee975fa
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/Command.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.reef.examples.hellohttp;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Command line parameter: a command to run. e.g. "echo Hello REEF"
+ */
+@NamedParameter(doc = "The shell command", short_name = "cmd", default_value = "*INTERACTIVE*")
+final class Command implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttp.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttp.java b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttp.java
new file mode 100644
index 0000000..11346ec
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttp.java
@@ -0,0 +1,110 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.reef.examples.hellohttp;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.DriverServiceConfiguration;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
+import org.apache.reef.webserver.HttpServerReefEventHandler;
+import org.apache.reef.webserver.ReefEventStateManager;
+
+import java.util.logging.Logger;
+
+/**
+ * Example to run HelloREEF with a webserver.
+ */
+public final class HelloREEFHttp {
+  /**
+   * Number of milliseconds to wait for the job to complete.
+   */
+  public static final int JOB_TIMEOUT = 60 * 1000; // 60 sec.
+  private static final Logger LOG = Logger.getLogger(HelloREEFHttp.class.getName());
+
+  /**
+   * @return the driver-side configuration to be merged into the DriverConfiguration to enable the HTTP server.
+   */
+  public static Configuration getHTTPConfiguration() {
+    final Configuration httpHandlerConfiguration = HttpHandlerConfiguration.CONF
+        .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerReefEventHandler.class)
+        .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerShellCmdtHandler.class)
+        .build();
+    final Configuration driverConfigurationForHttpServer = DriverServiceConfiguration.CONF
+        .set(DriverServiceConfiguration.ON_EVALUATOR_ALLOCATED, ReefEventStateManager.AllocatedEvaluatorStateHandler.class)
+        .set(DriverServiceConfiguration.ON_CONTEXT_ACTIVE, ReefEventStateManager.ActiveContextStateHandler.class)
+        .set(DriverServiceConfiguration.ON_TASK_RUNNING, ReefEventStateManager.TaskRunningStateHandler.class)
+        .set(DriverServiceConfiguration.ON_DRIVER_STARTED, ReefEventStateManager.StartStateHandler.class)
+        .set(DriverServiceConfiguration.ON_DRIVER_STOP, ReefEventStateManager.StopStateHandler.class)
+        .build();
+    return Configurations.merge(httpHandlerConfiguration, driverConfigurationForHttpServer);
+  }
+
+  /**
+   * @return the configuration of the HelloREEF driver.
+   */
+  public static Configuration getDriverConfiguration() {
+    return DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(HttpShellJobDriver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloHTTP")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, HttpShellJobDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HttpShellJobDriver.AllocatedEvaluatorHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_FAILED, HttpShellJobDriver.FailedEvaluatorHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, HttpShellJobDriver.ActiveContextHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_CLOSED, HttpShellJobDriver.ClosedContextHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_FAILED, HttpShellJobDriver.FailedContextHandler.class)
+        .set(DriverConfiguration.ON_TASK_COMPLETED, HttpShellJobDriver.CompletedTaskHandler.class)
+        .set(DriverConfiguration.ON_CLIENT_MESSAGE, HttpShellJobDriver.ClientMessageHandler.class)
+        .set(DriverConfiguration.ON_CLIENT_CLOSED, HttpShellJobDriver.HttpClientCloseHandler.class)
+        .set(DriverConfiguration.ON_DRIVER_STOP, HttpShellJobDriver.StopHandler.class)
+        .build();
+  }
+
+  /**
+   * Run Hello Reef with merged configuration
+   *
+   * @param runtimeConf
+   * @param timeOut
+   * @return
+   * @throws BindException
+   * @throws InjectionException
+   */
+  public static LauncherStatus runHelloReef(final Configuration runtimeConf, final int timeOut)
+      throws BindException, InjectionException {
+    final Configuration driverConf = Configurations.merge(HelloREEFHttp.getDriverConfiguration(), getHTTPConfiguration());
+    return DriverLauncher.getLauncher(runtimeConf).run(driverConf, timeOut);
+  }
+
+  /**
+   * main program
+   *
+   * @param args
+   * @throws InjectionException
+   */
+  public static void main(final String[] args) throws InjectionException {
+    final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+        .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 3)
+        .build();
+    final LauncherStatus status = runHelloReef(runtimeConfiguration, HelloREEFHttp.JOB_TIMEOUT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttpYarn.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttpYarn.java b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttpYarn.java
new file mode 100644
index 0000000..27b9fa4
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HelloREEFHttpYarn.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.examples.hellohttp;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * HelloREEFHttp for running on Yarn
+ */
+public class HelloREEFHttpYarn {
+
+  private static final Logger LOG = Logger.getLogger(HelloREEFHttpYarn.class.getName());
+
+  /**
+   * Start Hello REEF job. Runs method runHelloReef().
+   *
+   * @param args command line parameters.
+   * @throws org.apache.reef.tang.exceptions.BindException      configuration error.
+   * @throws org.apache.reef.tang.exceptions.InjectionException configuration error.
+   */
+  public static void main(final String[] args) throws BindException, InjectionException, IOException {
+
+    final Configuration runtimeConfiguration = YarnClientConfiguration.CONF.build();
+
+    final LauncherStatus status = HelloREEFHttp.runHelloReef(runtimeConfiguration, HelloREEFHttp.JOB_TIMEOUT);
+    LOG.log(Level.INFO, "REEF job completed: {0}", status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpServerShellCmdtHandler.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpServerShellCmdtHandler.java b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpServerShellCmdtHandler.java
new file mode 100644
index 0000000..3937942
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpServerShellCmdtHandler.java
@@ -0,0 +1,166 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.reef.examples.hellohttp;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.util.CommandUtils;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Http Event handler for Shell Command
+ */
+@Unit
+class HttpServerShellCmdtHandler implements HttpHandler {
+  /**
+   * Standard Java logger.
+   */
+  private static final Logger LOG = Logger.getLogger(HttpServerShellCmdtHandler.class.getName());
+
+  private static final int WAIT_TIMEOUT = 10 * 1000;
+
+  private static final int WAIT_TIME = 50;
+
+  /**
+   * ClientMessageHandler
+   */
+  private final InjectionFuture<HttpShellJobDriver.ClientMessageHandler> messageHandler;
+
+  /**
+   * uri specification
+   */
+  private String uriSpecification = "Command";
+
+  /**
+   * output for command
+   */
+  private String cmdOutput = null;
+
+  /**
+   * HttpServerDistributedShellEventHandler constructor.
+   */
+  @Inject
+  public HttpServerShellCmdtHandler(final InjectionFuture<HttpShellJobDriver.ClientMessageHandler> messageHandler) {
+    this.messageHandler = messageHandler;
+  }
+
+  /**
+   * returns URI specification for the handler
+   *
+   * @return
+   */
+  @Override
+  public String getUriSpecification() {
+    return uriSpecification;
+  }
+
+  /**
+   * set URI specification
+   *
+   * @param s
+   */
+  public void setUriSpecification(final String s) {
+    uriSpecification = s;
+  }
+
+  /**
+   * it is called when receiving a http request
+   *
+   * @param parsedHttpRequest
+   * @param response
+   */
+  @Override
+  public final synchronized void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) throws IOException, ServletException {
+    LOG.log(Level.INFO, "HttpServeShellCmdtHandler in webserver onHttpRequest is called: {0}", parsedHttpRequest.getRequestUri());
+    final Map<String, List<String>> queries = parsedHttpRequest.getQueryMap();
+    final String queryStr = parsedHttpRequest.getQueryString();
+
+    if (parsedHttpRequest.getTargetEntity().equalsIgnoreCase("Evaluators")) {
+      final byte[] b = HttpShellJobDriver.CODEC.encode(queryStr);
+      LOG.log(Level.INFO, "HttpServeShellCmdtHandler call HelloDriver onCommand(): {0}", queryStr);
+      messageHandler.get().onNext(b);
+
+      notify();
+
+      final long endTime = System.currentTimeMillis() + WAIT_TIMEOUT;
+      while (cmdOutput == null) {
+        final long waitTime = endTime - System.currentTimeMillis();
+        if (waitTime <= 0) {
+          break;
+        }
+
+        try {
+          wait(WAIT_TIME);
+        } catch (final InterruptedException e) {
+          LOG.log(Level.WARNING, "HttpServeShellCmdtHandler onHttpRequest InterruptedException: {0}", e);
+        }
+      }
+      response.getOutputStream().write(cmdOutput.getBytes(Charset.forName("UTF-8")));
+      cmdOutput = null;
+    } else if (parsedHttpRequest.getTargetEntity().equalsIgnoreCase("Driver")) {
+      final String cmdOutput = CommandUtils.runCommand(queryStr);
+      response.getOutputStream().write(cmdOutput.getBytes(Charset.forName("UTF-8")));
+    }
+  }
+
+  /**
+   * called after shell command is completed
+   *
+   * @param message
+   */
+  public final synchronized void onHttpCallback(byte[] message) {
+    final long endTime = System.currentTimeMillis() + WAIT_TIMEOUT;
+    while (cmdOutput != null) {
+      final long waitTime = endTime - System.currentTimeMillis();
+      if (waitTime <= 0) {
+        break;
+      }
+
+      try {
+        wait(WAIT_TIME);
+      } catch (final InterruptedException e) {
+        LOG.log(Level.WARNING, "HttpServeShellCmdtHandler onHttpCallback InterruptedException: {0}", e);
+      }
+    }
+    LOG.log(Level.INFO, "HttpServeShellCmdtHandler OnCallback: {0}", HttpShellJobDriver.CODEC.decode(message));
+    cmdOutput = HttpShellJobDriver.CODEC.decode(message);
+
+    notify();
+  }
+
+  /**
+   * Handler for client to call back
+   */
+  final class ClientCallBackHandler implements EventHandler<byte[]> {
+    @Override
+    public void onNext(final byte[] message) {
+      HttpServerShellCmdtHandler.this.onHttpCallback(message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpShellJobDriver.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpShellJobDriver.java b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpShellJobDriver.java
new file mode 100644
index 0000000..977b8b1
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hellohttp/HttpShellJobDriver.java
@@ -0,0 +1,360 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.reef.examples.hellohttp;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Driver code for the Hello REEF Http Distributed Shell Application
+ */
+@Unit
+public final class HttpShellJobDriver {
+
+  /**
+   * String codec is used to encode the results
+   * before passing them back to the client.
+   */
+  public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+  private static final Logger LOG = Logger.getLogger(HttpShellJobDriver.class.getName());
+  /**
+   * Evaluator Requester
+   */
+  private final EvaluatorRequestor evaluatorRequestor;
+  /**
+   * Number of Evalutors to request (default is 1).
+   */
+  private final int numEvaluators = 2;
+  /**
+   * Shell execution results from each Evaluator.
+   */
+  private final List<String> results = new ArrayList<>();
+  /**
+   * Map from context ID to running evaluator context.
+   */
+  private final Map<String, ActiveContext> contexts = new HashMap<>();
+  /**
+   * Job driver state.
+   */
+  private State state = State.INIT;
+  /**
+   * First command to execute. Sometimes client can send us the first command
+   * before Evaluators are available; we need to store this command here.
+   */
+  private String cmd;
+  /**
+   * Number of evaluators/tasks to complete.
+   */
+  private int expectCount = 0;
+  /**
+   * Callback handler for http return message
+   */
+  private HttpServerShellCmdtHandler.ClientCallBackHandler httpCallbackHandler;
+
+  /**
+   * Job Driver Constructor
+   *
+   * @param requestor
+   * @param clientCallBackHandler
+   */
+  @Inject
+  public HttpShellJobDriver(final EvaluatorRequestor requestor, final HttpServerShellCmdtHandler.ClientCallBackHandler clientCallBackHandler) {
+    this.evaluatorRequestor = requestor;
+    this.httpCallbackHandler = clientCallBackHandler;
+    LOG.log(Level.FINE, "Instantiated 'HelloDriver'");
+  }
+
+  /**
+   * Construct the final result and forward it to the Client.
+   */
+  private void returnResults() {
+    final StringBuilder sb = new StringBuilder();
+    for (final String result : this.results) {
+      sb.append(result);
+    }
+    this.results.clear();
+    LOG.log(Level.INFO, "Return results to the client:\n{0}", sb);
+    httpCallbackHandler.onNext(CODEC.encode(sb.toString()));
+  }
+
+  /**
+   * Submit command to all available evaluators.
+   *
+   * @param command shell command to execute.
+   */
+  private void submit(final String command) {
+    LOG.log(Level.INFO, "Submit command {0} to {1} evaluators. state: {2}",
+        new Object[]{command, this.contexts.size(), this.state});
+    assert (this.state == State.READY);
+    this.expectCount = this.contexts.size();
+    this.state = State.WAIT_TASKS;
+    this.cmd = null;
+    for (final ActiveContext context : this.contexts.values()) {
+      this.submit(context, command);
+    }
+  }
+
+  /**
+   * Submit a Task that execute the command to a single Evaluator.
+   * This method is called from <code>submitTask(cmd)</code>.
+   */
+  private void submit(final ActiveContext context, final String command) {
+    try {
+      LOG.log(Level.INFO, "Send command {0} to context: {1}", new Object[]{command, context});
+      final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+      cb.addConfiguration(
+          TaskConfiguration.CONF
+              .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
+              .set(TaskConfiguration.TASK, ShellTask.class)
+              .build()
+      );
+      cb.bindNamedParameter(org.apache.reef.examples.hellohttp.Command.class, command);
+      context.submitTask(cb.build());
+    } catch (final BindException ex) {
+      LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
+      context.close();
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * Request the evaluators.
+   */
+  private synchronized void requestEvaluators() {
+    assert (this.state == State.INIT);
+    LOG.log(Level.INFO, "Schedule on {0} Evaluators.", this.numEvaluators);
+    this.evaluatorRequestor.submit(
+        EvaluatorRequest.newBuilder()
+            .setMemory(128)
+            .setNumberOfCores(1)
+            .setNumber(this.numEvaluators).build()
+    );
+    this.state = State.WAIT_EVALUATORS;
+    this.expectCount = this.numEvaluators;
+  }
+
+  /**
+   * Possible states of the job driver. Can be one of:
+   * <dl>
+   * <du><code>INIT</code></du><dd>initial state, ready to request the evaluators.</dd>
+   * <du><code>WAIT_EVALUATORS</code></du><dd>Wait for requested evaluators to initialize.</dd>
+   * <du><code>READY</code></du><dd>Ready to submitTask a new task.</dd>
+   * <du><code>WAIT_TASKS</code></du><dd>Wait for tasks to complete.</dd>
+   * </dl>
+   */
+  private enum State {
+    INIT, WAIT_EVALUATORS, READY, WAIT_TASKS
+  }
+
+  /**
+   * Receive notification that an Evaluator had been allocated,
+   * and submitTask a new Task in that Evaluator.
+   */
+  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator eval) {
+      synchronized (HttpShellJobDriver.this) {
+        LOG.log(Level.INFO, "Allocated Evaluator: {0} expect {1} running {2}",
+            new Object[]{eval.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.contexts.size()});
+        assert (HttpShellJobDriver.this.state == State.WAIT_EVALUATORS);
+        try {
+          eval.submitContext(ContextConfiguration.CONF.set(
+              ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build());
+        } catch (final BindException ex) {
+          LOG.log(Level.SEVERE, "Failed to submit a context to evaluator: " + eval.getId(), ex);
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the entire Evaluator had failed.
+   * Stop other jobs and pass this error to the job observer on the client.
+   */
+  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+    @Override
+    public void onNext(final FailedEvaluator eval) {
+      synchronized (HttpShellJobDriver.this) {
+        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+        for (final FailedContext failedContext : eval.getFailedContextList()) {
+          HttpShellJobDriver.this.contexts.remove(failedContext.getId());
+        }
+        throw new RuntimeException("Failed Evaluator: ", eval.getEvaluatorException());
+      }
+    }
+  }
+
+  /**
+   * Receive notification that a new Context is available.
+   * Submit a new Distributed Shell Task to that Context.
+   */
+  final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext context) {
+      synchronized (HttpShellJobDriver.this) {
+        LOG.log(Level.INFO, "Context available: {0} expect {1} state {2}",
+            new Object[]{context.getId(), HttpShellJobDriver.this.expectCount, HttpShellJobDriver.this.state});
+        assert (HttpShellJobDriver.this.state == State.WAIT_EVALUATORS);
+        HttpShellJobDriver.this.contexts.put(context.getId(), context);
+        if (--HttpShellJobDriver.this.expectCount <= 0) {
+          HttpShellJobDriver.this.state = State.READY;
+          if (HttpShellJobDriver.this.cmd == null) {
+            LOG.log(Level.INFO, "All evaluators ready; waiting for command. State: {0}",
+                HttpShellJobDriver.this.state);
+          } else {
+            HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Context had completed.
+   * Remove context from the list of active context.
+   */
+  final class ClosedContextHandler implements EventHandler<ClosedContext> {
+    @Override
+    public void onNext(final ClosedContext context) {
+      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
+      synchronized (HttpShellJobDriver.this) {
+        HttpShellJobDriver.this.contexts.remove(context.getId());
+      }
+    }
+  }
+
+  final class HttpClientCloseHandler implements EventHandler<Void> {
+    @Override
+    public void onNext(final Void aVoid) throws RuntimeException {
+      LOG.log(Level.INFO, "Received a close message from the client. You can put code here to properly close drivers and evaluators.");
+      for (final ActiveContext c : contexts.values()) {
+        c.close();
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Context had failed.
+   * Remove context from the list of active context and notify the client.
+   */
+  final class FailedContextHandler implements EventHandler<FailedContext> {
+    @Override
+    public void onNext(final FailedContext context) {
+      LOG.log(Level.SEVERE, "FailedContext", context);
+      synchronized (HttpShellJobDriver.this) {
+        HttpShellJobDriver.this.contexts.remove(context.getId());
+      }
+      throw new RuntimeException("Failed context: ", context.asError());
+    }
+  }
+
+  /**
+   * Receive notification that the Task has completed successfully.
+   */
+  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask task) {
+      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
+      // Take the message returned by the task and add it to the running result.
+      final String result = CODEC.decode(task.get());
+      synchronized (HttpShellJobDriver.this) {
+        HttpShellJobDriver.this.results.add(task.getId() + " :: " + result);
+        LOG.log(Level.INFO, "Task {0} result {1}: {2} state: {3}", new Object[]{
+            task.getId(), HttpShellJobDriver.this.results.size(), result, HttpShellJobDriver.this.state});
+        if (--HttpShellJobDriver.this.expectCount <= 0) {
+          HttpShellJobDriver.this.returnResults();
+          HttpShellJobDriver.this.state = State.READY;
+          if (HttpShellJobDriver.this.cmd != null) {
+            HttpShellJobDriver.this.submit(HttpShellJobDriver.this.cmd);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification from the client.
+   */
+  final class ClientMessageHandler implements EventHandler<byte[]> {
+    @Override
+    public void onNext(final byte[] message) {
+      synchronized (HttpShellJobDriver.this) {
+        final String command = CODEC.decode(message);
+        LOG.log(Level.INFO, "Client message: {0} state: {1}",
+            new Object[]{command, HttpShellJobDriver.this.state});
+        assert (HttpShellJobDriver.this.cmd == null);
+        if (HttpShellJobDriver.this.state == State.READY) {
+          HttpShellJobDriver.this.submit(command);
+        } else {
+          // not ready yet - save the command for better times.
+          assert (HttpShellJobDriver.this.state == State.WAIT_EVALUATORS);
+          HttpShellJobDriver.this.cmd = command;
+        }
+      }
+    }
+  }
+
+  /**
+   * Job Driver is ready and the clock is set up: request the evaluators.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      LOG.log(Level.INFO, "{0} StartTime: {1}", new Object[]{state, startTime});
+      assert (state == State.INIT);
+      requestEvaluators();
+    }
+  }
+
+  /**
+   * Shutting down the job driver: close the evaluators.
+   */
+  final class StopHandler implements EventHandler<StopTime> {
+    @Override
+    public void onNext(final StopTime time) {
+      LOG.log(Level.INFO, "{0} StopTime: {1}", new Object[]{state, time});
+      for (final ActiveContext context : contexts.values()) {
+        context.close();
+      }
+    }
+  }
+}