You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:08 UTC

[35/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/CloseEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/CloseEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/CloseEventImpl.java
new file mode 100644
index 0000000..5b502d1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/CloseEventImpl.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.util.Optional;
+
+final class CloseEventImpl implements CloseEvent {
+  private final Optional<byte[]> value;
+
+  CloseEventImpl() {
+    this.value = Optional.empty();
+  }
+
+  CloseEventImpl(final byte[] theBytes) {
+    this.value = Optional.ofNullable(theBytes);
+  }
+
+  @Override
+  public Optional<byte[]> get() {
+    return this.value;
+  }
+
+  @Override
+  public String toString() {
+    return "CloseEvent{value=" + this.value + '}';
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/DriverMessageImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/DriverMessageImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/DriverMessageImpl.java
new file mode 100644
index 0000000..d1a492e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/DriverMessageImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.util.Optional;
+
+final class DriverMessageImpl implements DriverMessage {
+  private final Optional<byte[]> value;
+
+  DriverMessageImpl() {
+    this.value = Optional.empty();
+  }
+
+  DriverMessageImpl(final byte[] theBytes) {
+    this.value = Optional.ofNullable(theBytes);
+  }
+
+  @Override
+  public Optional<byte[]> get() {
+    return this.value;
+  }
+
+  @Override
+  public String toString() {
+    return "DriverMessage{value=" + this.value + '}';
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/SuspendEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/SuspendEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/SuspendEventImpl.java
new file mode 100644
index 0000000..a1c0f0d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/SuspendEventImpl.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.util.Optional;
+
+final class SuspendEventImpl implements SuspendEvent {
+  private final Optional<byte[]> value;
+
+  SuspendEventImpl() {
+    this.value = Optional.empty();
+  }
+
+  SuspendEventImpl(final byte[] theBytes) {
+    this.value = Optional.ofNullable(theBytes);
+  }
+
+  @Override
+  public Optional<byte[]> get() {
+    return this.value;
+  }
+
+  @Override
+  public String toString() {
+    return "SuspendEvent{value=" + this.value + '}';
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskClientCodeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskClientCodeException.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskClientCodeException.java
new file mode 100644
index 0000000..1e0cc5e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskClientCodeException.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+/**
+ * Thrown by REEF's resourcemanager code when it catches an exception thrown by user code.
+ */
+public final class TaskClientCodeException extends Exception {
+
+  private final String taskId;
+  private final String contextId;
+
+  /**
+   * @param taskId    the id of the failed task.
+   * @param contextId the ID of the context the failed Task was executing in.
+   * @param message   the error message.
+   * @param cause     the exception that caused the Task to fail.
+   */
+  public TaskClientCodeException(final String taskId,
+                                 final String contextId,
+                                 final String message,
+                                 final Throwable cause) {
+    super("Failure in task '" + taskId + "' in context '" + contextId + "': " + message, cause);
+    this.taskId = taskId;
+    this.contextId = contextId;
+  }
+
+  /**
+   * Extracts a task id from the given configuration.
+   *
+   * @param config
+   * @return the task id in the given configuration.
+   * @throws RuntimeException if the configuration can't be parsed.
+   */
+  public static String getTaskId(final Configuration config) {
+    try {
+      return Tang.Factory.getTang().newInjector(config).getNamedInstance(TaskConfigurationOptions.Identifier.class);
+    } catch (final InjectionException ex) {
+      throw new RuntimeException("Unable to determine task identifier. Giving up.", ex);
+    }
+  }
+
+  /**
+   * @return the ID of the failed Task.
+   */
+  public String getTaskId() {
+    return this.taskId;
+  }
+
+  /**
+   * @return the ID of the context the failed Task was executing in.
+   */
+  public String getContextId() {
+    return this.contextId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskLifeCycleHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskLifeCycleHandlers.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskLifeCycleHandlers.java
new file mode 100644
index 0000000..f03ed13
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskLifeCycleHandlers.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.runtime.common.evaluator.task.exceptions.TaskStartHandlerFailure;
+import org.apache.reef.runtime.common.evaluator.task.exceptions.TaskStopHandlerFailure;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Convenience class to send task start and stop events.
+ */
+@EvaluatorSide
+@Private
+final class TaskLifeCycleHandlers {
+  private static final Logger LOG = Logger.getLogger(TaskLifeCycleHandlers.class.getName());
+  private final Set<EventHandler<TaskStop>> taskStopHandlers;
+  private final Set<EventHandler<TaskStart>> taskStartHandlers;
+  private final TaskStart taskStart;
+  private final TaskStop taskStop;
+
+  @Inject
+  TaskLifeCycleHandlers(final @Parameter(TaskConfigurationOptions.StopHandlers.class) Set<EventHandler<TaskStop>> taskStopHandlers,
+                        final @Parameter(TaskConfigurationOptions.StartHandlers.class) Set<EventHandler<TaskStart>> taskStartHandlers,
+                        final TaskStartImpl taskStart,
+                        final TaskStopImpl taskStop) {
+    this.taskStopHandlers = taskStopHandlers;
+    this.taskStartHandlers = taskStartHandlers;
+    this.taskStart = taskStart;
+    this.taskStop = taskStop;
+  }
+
+  /**
+   * Sends the TaskStart event to the handlers for it.
+   */
+  public void beforeTaskStart() throws TaskStartHandlerFailure {
+    LOG.log(Level.FINEST, "Sending TaskStart event to the registered event handlers.");
+    for (final EventHandler<TaskStart> startHandler : this.taskStartHandlers) {
+      try {
+        startHandler.onNext(this.taskStart);
+      } catch (final Throwable throwable) {
+        throw new TaskStartHandlerFailure(startHandler, throwable);
+      }
+    }
+    LOG.log(Level.FINEST, "Done sending TaskStart event to the registered event handlers.");
+  }
+
+  /**
+   * Sends the TaskStop event to the handlers for it.
+   */
+  public void afterTaskExit() throws TaskStopHandlerFailure {
+    LOG.log(Level.FINEST, "Sending TaskStop event to the registered event handlers.");
+    for (final EventHandler<TaskStop> stopHandler : this.taskStopHandlers) {
+      try {
+        stopHandler.onNext(this.taskStop);
+      } catch (final Throwable throwable) {
+        throw new TaskStopHandlerFailure(stopHandler, throwable);
+      }
+    }
+    LOG.log(Level.FINEST, "Done sending TaskStop event to the registered event handlers.");
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskRuntime.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskRuntime.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskRuntime.java
new file mode 100644
index 0000000..ea8dd27
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskRuntime.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
+import org.apache.reef.runtime.common.evaluator.task.exceptions.*;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import javax.xml.bind.DatatypeConverter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The execution environment for a Task.
+ */
+@Private
+@EvaluatorSide
+public final class TaskRuntime implements Runnable {
+
+  private final static Logger LOG = Logger.getLogger(TaskRuntime.class.getName());
+
+  /**
+   * User supplied Task code.
+   */
+  private final Task task;
+
+  private final InjectionFuture<EventHandler<CloseEvent>> f_closeHandler;
+  private final InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler;
+  private final InjectionFuture<EventHandler<DriverMessage>> f_messageHandler;
+  private final TaskLifeCycleHandlers taskLifeCycleHandlers;
+
+  /**
+   * The memento given by the task configuration.
+   */
+  private final Optional<byte[]> memento;
+
+  /**
+   * Heart beat manager to trigger on heartbeats.
+   */
+  private final HeartBeatManager heartBeatManager;
+
+  private final TaskStatus currentStatus;
+
+  // TODO: Document
+  @Inject
+  private TaskRuntime(
+      final HeartBeatManager heartBeatManager,
+      final Task task,
+      final TaskStatus currentStatus,
+      final @Parameter(TaskConfigurationOptions.CloseHandler.class) InjectionFuture<EventHandler<CloseEvent>> f_closeHandler,
+      final @Parameter(TaskConfigurationOptions.SuspendHandler.class) InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler,
+      final @Parameter(TaskConfigurationOptions.MessageHandler.class) InjectionFuture<EventHandler<DriverMessage>> f_messageHandler,
+      final TaskLifeCycleHandlers taskLifeCycleHandlers) {
+    this(heartBeatManager, task, currentStatus, f_closeHandler, f_suspendHandler, f_messageHandler, null, taskLifeCycleHandlers);
+  }
+
+  // TODO: Document
+  @Inject
+  private TaskRuntime(
+      final HeartBeatManager heartBeatManager,
+      final Task task,
+      final TaskStatus currentStatus,
+      final @Parameter(TaskConfigurationOptions.CloseHandler.class) InjectionFuture<EventHandler<CloseEvent>> f_closeHandler,
+      final @Parameter(TaskConfigurationOptions.SuspendHandler.class) InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler,
+      final @Parameter(TaskConfigurationOptions.MessageHandler.class) InjectionFuture<EventHandler<DriverMessage>> f_messageHandler,
+      final @Parameter(TaskConfigurationOptions.Memento.class) String memento,
+      final TaskLifeCycleHandlers taskLifeCycleHandlers) {
+
+    this.heartBeatManager = heartBeatManager;
+    this.task = task;
+    this.taskLifeCycleHandlers = taskLifeCycleHandlers;
+
+    this.memento = null == memento ? Optional.<byte[]>empty() :
+        Optional.of(DatatypeConverter.parseBase64Binary(memento));
+
+    this.f_closeHandler = f_closeHandler;
+    this.f_suspendHandler = f_suspendHandler;
+    this.f_messageHandler = f_messageHandler;
+
+    this.currentStatus = currentStatus;
+  }
+
+  /**
+   * This method needs to be called before a Task can be run().
+   * It informs the Driver that the Task is initializing.
+   */
+  public void initialize() {
+    this.currentStatus.setInit();
+  }
+
+  /**
+   * Run the task: Fire TaskStart, call Task.call(), fire TaskStop.
+   */
+  @Override
+  public void run() {
+    try {
+      // Change state and inform the Driver
+      this.taskLifeCycleHandlers.beforeTaskStart();
+
+      LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStart>.");
+      this.currentStatus.setRunning();
+
+      // Call Task.call()
+      final byte[] result = this.runTask();
+
+      // Inform the Driver about it
+      this.currentStatus.setResult(result);
+
+      LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStop>.");
+      this.taskLifeCycleHandlers.afterTaskExit();
+
+    } catch (final TaskStartHandlerFailure taskStartHandlerFailure) {
+      LOG.log(Level.WARNING, "Caught an exception during TaskStart handler execution.", taskStartHandlerFailure);
+      this.currentStatus.setException(taskStartHandlerFailure.getCause());
+    } catch (final TaskStopHandlerFailure taskStopHandlerFailure) {
+      LOG.log(Level.WARNING, "Caught an exception during TaskStop handler execution.", taskStopHandlerFailure);
+      this.currentStatus.setException(taskStopHandlerFailure.getCause());
+    } catch (final TaskCallFailure e) {
+      LOG.log(Level.WARNING, "Caught an exception during Task.call().", e.getCause());
+      this.currentStatus.setException(e);
+    }
+  }
+
+  /**
+   * Called by heartbeat manager
+   *
+   * @return current TaskStatusProto
+   */
+  public ReefServiceProtos.TaskStatusProto getStatusProto() {
+    return this.currentStatus.toProto();
+  }
+
+  /**
+   * @return true, if the Task is no longer running, either because it is crashed or exited cleanly
+   */
+  public boolean hasEnded() {
+    return this.currentStatus.hasEnded();
+  }
+
+  /**
+   * @return the ID of the task.
+   */
+  public String getTaskId() {
+    return this.currentStatus.getTaskId();
+  }
+
+  public String getId() {
+    return "TASK:" + this.task.getClass().getSimpleName() + ':' + this.currentStatus.getTaskId();
+  }
+
+  /**
+   * Close the Task. This calls the configured close handler.
+   *
+   * @param message the optional message for the close handler or null if there none.
+   */
+  public final void close(final byte[] message) {
+    LOG.log(Level.FINEST, "Triggering Task close.");
+    synchronized (this.heartBeatManager) {
+      if (this.currentStatus.isNotRunning()) {
+        LOG.log(Level.WARNING, "Trying to close a task that is in state: {0}. Ignoring.",
+            this.currentStatus.getState());
+      } else {
+        try {
+          this.closeTask(message);
+          this.currentStatus.setCloseRequested();
+        } catch (final TaskCloseHandlerFailure taskCloseHandlerFailure) {
+          LOG.log(Level.WARNING, "Exception while executing task close handler.",
+              taskCloseHandlerFailure.getCause());
+          this.currentStatus.setException(taskCloseHandlerFailure.getCause());
+        }
+      }
+    }
+  }
+
+  /**
+   * Suspend the Task.  This calls the configured suspend handler.
+   *
+   * @param message the optional message for the suspend handler or null if there none.
+   */
+  public void suspend(final byte[] message) {
+    synchronized (this.heartBeatManager) {
+      if (this.currentStatus.isNotRunning()) {
+        LOG.log(Level.WARNING, "Trying to suspend a task that is in state: {0}. Ignoring.",
+            this.currentStatus.getState());
+      } else {
+        try {
+          this.suspendTask(message);
+          this.currentStatus.setSuspendRequested();
+        } catch (final TaskSuspendHandlerFailure taskSuspendHandlerFailure) {
+          LOG.log(Level.WARNING, "Exception while executing task suspend handler.",
+              taskSuspendHandlerFailure.getCause());
+          this.currentStatus.setException(taskSuspendHandlerFailure.getCause());
+        }
+      }
+    }
+  }
+
+  /**
+   * Deliver a message to the Task. This calls into the user supplied message handler.
+   *
+   * @param message the message to be delivered.
+   */
+  public void deliver(final byte[] message) {
+    synchronized (this.heartBeatManager) {
+      if (this.currentStatus.isNotRunning()) {
+        LOG.log(Level.WARNING,
+            "Trying to send a message to a task that is in state: {0}. Ignoring.",
+            this.currentStatus.getState());
+      } else {
+        try {
+          this.deliverMessageToTask(message);
+        } catch (final TaskMessageHandlerFailure taskMessageHandlerFailure) {
+          LOG.log(Level.WARNING, "Exception while executing task close handler.",
+              taskMessageHandlerFailure.getCause());
+          this.currentStatus.setException(taskMessageHandlerFailure.getCause());
+        }
+      }
+    }
+  }
+
+  /**
+   * @return the ID of the Context this task is executing in.
+   */
+  private String getContextID() {
+    return this.currentStatus.getContextId();
+  }
+
+  /**
+   * Calls the Task.call() method and catches exceptions it may throw.
+   *
+   * @return the return value of Task.call()
+   * @throws TaskCallFailure if any Throwable was caught from the Task.call() method.
+   *                         That throwable would be the cause of the TaskCallFailure.
+   */
+  private byte[] runTask() throws TaskCallFailure {
+    try {
+      final byte[] result;
+      if (this.memento.isPresent()) {
+        LOG.log(Level.FINEST, "Calling Task.call() with a memento");
+        result = this.task.call(this.memento.get());
+      } else {
+        LOG.log(Level.FINEST, "Calling Task.call() without a memento");
+        result = this.task.call(null);
+      }
+      LOG.log(Level.FINEST, "Task.call() exited cleanly.");
+      return result;
+    } catch (final Throwable throwable) {
+      throw new TaskCallFailure(throwable);
+    }
+  }
+
+  /**
+   * Calls the configured Task close handler and catches exceptions it may throw.
+   */
+  private void closeTask(final byte[] message) throws TaskCloseHandlerFailure {
+    LOG.log(Level.FINEST, "Invoking close handler.");
+    try {
+      this.f_closeHandler.get().onNext(new CloseEventImpl(message));
+    } catch (final Throwable throwable) {
+      throw new TaskCloseHandlerFailure(throwable);
+    }
+  }
+
+  /**
+   * Calls the configured Task message handler and catches exceptions it may throw.
+   */
+  private void deliverMessageToTask(final byte[] message) throws TaskMessageHandlerFailure {
+    try {
+      this.f_messageHandler.get().onNext(new DriverMessageImpl(message));
+    } catch (final Throwable throwable) {
+      throw new TaskMessageHandlerFailure(throwable);
+    }
+  }
+
+  /**
+   * Calls the configured Task suspend handler and catches exceptions it may throw.
+   */
+  private void suspendTask(final byte[] message) throws TaskSuspendHandlerFailure {
+    try {
+      this.f_suspendHandler.get().onNext(new SuspendEventImpl(message));
+    } catch (final Throwable throwable) {
+      throw new TaskSuspendHandlerFailure(throwable);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStartImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStartImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStartImpl.java
new file mode 100644
index 0000000..5bd598e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStartImpl.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStart;
+
+import javax.inject.Inject;
+
+/**
+ * Injectable implementation of TaskStart
+ */
+final class TaskStartImpl implements TaskStart {
+
+  private final String id;
+
+  @Inject
+  TaskStartImpl(final @Parameter(TaskConfigurationOptions.Identifier.class) String id) {
+    this.id = id;
+  }
+
+  @Override
+  public String getId() {
+    return this.id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java
new file mode 100644
index 0000000..2cce16f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.TaskMessage;
+import org.apache.reef.task.TaskMessageSource;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the various states a Task could be in.
+ */
+public final class TaskStatus {
+  private static final Logger LOG = Logger.getLogger(TaskStatus.class.getName());
+
+  private final String taskId;
+  private final String contextId;
+  private final HeartBeatManager heartBeatManager;
+  private final Set<TaskMessageSource> evaluatorMessageSources;
+  private final ExceptionCodec exceptionCodec;
+  private Optional<Throwable> lastException = Optional.empty();
+  private Optional<byte[]> result = Optional.empty();
+  private State state = State.PRE_INIT;
+
+
+  @Inject
+  TaskStatus(final @Parameter(TaskConfigurationOptions.Identifier.class) String taskId,
+             final @Parameter(ContextIdentifier.class) String contextId,
+             final @Parameter(TaskConfigurationOptions.TaskMessageSources.class) Set<TaskMessageSource> evaluatorMessageSources,
+             final HeartBeatManager heartBeatManager,
+             final ExceptionCodec exceptionCodec) {
+    this.taskId = taskId;
+    this.contextId = contextId;
+    this.heartBeatManager = heartBeatManager;
+    this.evaluatorMessageSources = evaluatorMessageSources;
+    this.exceptionCodec = exceptionCodec;
+  }
+
+  /**
+   * @param from
+   * @param to
+   * @return true, if the state transition from state 'from' to state 'to' is legal.
+   */
+  private static boolean isLegal(final State from, final State to) {
+    if (from == null) {
+      return to == State.INIT;
+    }
+    switch (from) {
+      case PRE_INIT:
+        switch (to) {
+          case INIT:
+            return true;
+          default:
+            return false;
+        }
+      case INIT:
+        switch (to) {
+          case RUNNING:
+          case FAILED:
+          case KILLED:
+          case DONE:
+            return true;
+          default:
+            return false;
+        }
+      case RUNNING:
+        switch (to) {
+          case CLOSE_REQUESTED:
+          case SUSPEND_REQUESTED:
+          case FAILED:
+          case KILLED:
+          case DONE:
+            return true;
+          default:
+            return false;
+        }
+      case CLOSE_REQUESTED:
+        switch (to) {
+          case FAILED:
+          case KILLED:
+          case DONE:
+            return true;
+          default:
+            return false;
+        }
+      case SUSPEND_REQUESTED:
+        switch (to) {
+          case FAILED:
+          case KILLED:
+          case SUSPENDED:
+            return true;
+          default:
+            return false;
+        }
+
+      case FAILED:
+      case DONE:
+      case KILLED:
+        return false;
+      default:
+        return false;
+    }
+  }
+
+  public final String getTaskId() {
+    return this.taskId;
+  }
+
+  ReefServiceProtos.TaskStatusProto toProto() {
+    this.check();
+    final ReefServiceProtos.TaskStatusProto.Builder result = ReefServiceProtos.TaskStatusProto.newBuilder()
+        .setContextId(this.contextId)
+        .setTaskId(this.taskId)
+        .setState(this.getProtoState());
+
+    if (this.result.isPresent()) {
+      result.setResult(ByteString.copyFrom(this.result.get()));
+    } else if (this.lastException.isPresent()) {
+      final byte[] error = this.exceptionCodec.toBytes(this.lastException.get());
+      result.setResult(ByteString.copyFrom(error));
+    } else if (this.state == State.RUNNING) {
+      for (final TaskMessage taskMessage : this.getMessages()) {
+        result.addTaskMessage(ReefServiceProtos.TaskStatusProto.TaskMessageProto.newBuilder()
+            .setSourceId(taskMessage.getMessageSourceID())
+            .setMessage(ByteString.copyFrom(taskMessage.get()))
+            .build());
+      }
+    }
+
+    return result.build();
+  }
+
+  private void check() {
+    if (this.result.isPresent() && this.lastException.isPresent()) {
+      throw new RuntimeException("Found both an exception and a result. This is unsupported.");
+    }
+  }
+
+  private ReefServiceProtos.State getProtoState() {
+    switch (this.state) {
+      case INIT:
+        return ReefServiceProtos.State.INIT;
+      case CLOSE_REQUESTED:
+      case SUSPEND_REQUESTED:
+      case RUNNING:
+        return ReefServiceProtos.State.RUNNING;
+      case DONE:
+        return ReefServiceProtos.State.DONE;
+      case SUSPENDED:
+        return ReefServiceProtos.State.SUSPEND;
+      case FAILED:
+        return ReefServiceProtos.State.FAILED;
+      case KILLED:
+        return ReefServiceProtos.State.KILLED;
+    }
+    throw new RuntimeException("Unknown state: " + this.state);
+  }
+
+  void setException(final Throwable throwable) {
+    synchronized (this.heartBeatManager) {
+      this.lastException = Optional.of(throwable);
+      this.state = State.FAILED;
+      this.check();
+      this.heartbeat();
+    }
+  }
+
+  void setResult(final byte[] result) {
+    synchronized (this.heartBeatManager) {
+      this.result = Optional.ofNullable(result);
+      if (this.state == State.RUNNING) {
+        this.setState(State.DONE);
+      } else if (this.state == State.SUSPEND_REQUESTED) {
+        this.setState(State.SUSPENDED);
+      } else if (this.state == State.CLOSE_REQUESTED) {
+        this.setState(State.DONE);
+      }
+      this.check();
+      this.heartbeat();
+    }
+  }
+
+  private void heartbeat() {
+    this.heartBeatManager.sendTaskStatus(this.toProto());
+  }
+
+  /**
+   * Sets the state to INIT and informs the driver about it.
+   */
+  void setInit() {
+    LOG.log(Level.FINEST, "Sending Task INIT heartbeat to the Driver.");
+    this.setState(State.INIT);
+    this.heartbeat();
+  }
+
+  /**
+   * Sets the state to RUNNING after the handlers for TaskStart have been called.
+   */
+  void setRunning() {
+    this.setState(State.RUNNING);
+  }
+
+  void setCloseRequested() {
+    this.setState(State.CLOSE_REQUESTED);
+  }
+
+  void setSuspendRequested() {
+    this.setState(State.SUSPEND_REQUESTED);
+  }
+
+  void setKilled() {
+    this.setState(State.KILLED);
+    this.heartbeat();
+  }
+
+  boolean isRunning() {
+    return this.state == State.RUNNING;
+  }
+
+  boolean isNotRunning() {
+    return this.state != State.RUNNING;
+  }
+
+  boolean hasEnded() {
+    switch (this.state) {
+      case DONE:
+      case SUSPENDED:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  State getState() {
+    return this.state;
+  }
+
+  private void setState(final State state) {
+    if (isLegal(this.state, state)) {
+      this.state = state;
+    } else {
+      final String msg = "Illegal state transition from [" + this.state + "] to [" + state + "]";
+      LOG.log(Level.SEVERE, msg);
+      throw new RuntimeException(msg);
+    }
+  }
+
+  String getContextId() {
+    return this.contextId;
+  }
+
+  /**
+   * @return the messages to be sent on the Task's behalf in the next heartbeat.
+   */
+  private final Collection<TaskMessage> getMessages() {
+    final List<TaskMessage> result = new ArrayList<>(this.evaluatorMessageSources.size());
+    for (final TaskMessageSource messageSource : this.evaluatorMessageSources) {
+      final Optional<TaskMessage> taskMessageOptional = messageSource.getMessage();
+      if (taskMessageOptional.isPresent()) {
+        result.add(taskMessageOptional.get());
+      }
+    }
+    return result;
+  }
+
+
+  enum State {
+    PRE_INIT,
+    INIT,
+    RUNNING,
+    CLOSE_REQUESTED,
+    SUSPEND_REQUESTED,
+    SUSPENDED,
+    FAILED,
+    DONE,
+    KILLED
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStopImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStopImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStopImpl.java
new file mode 100644
index 0000000..521d859
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStopImpl.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStop;
+
+import javax.inject.Inject;
+
+/**
+ * Injectable implementation of TaskStop
+ */
+final class TaskStopImpl implements TaskStop {
+  private final String id;
+
+  @Inject
+  TaskStopImpl(final @Parameter(TaskConfigurationOptions.Identifier.class) String id) {
+    this.id = id;
+  }
+
+  @Override
+  public String getId() {
+    return this.id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultCloseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultCloseHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultCloseHandler.java
new file mode 100644
index 0000000..6920f82
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultCloseHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.defaults;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default implementation for EventHandler<CloseEvent>
+ */
+@Private
+public final class DefaultCloseHandler implements EventHandler<CloseEvent> {
+
+  @Inject
+  public DefaultCloseHandler() {
+  }
+
+  @Override
+  public void onNext(final CloseEvent closeEvent) {
+    throw new RuntimeException("No EventHandler<CloseEvent> registered. Event received: " + closeEvent);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultDriverMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultDriverMessageHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultDriverMessageHandler.java
new file mode 100644
index 0000000..f1b4f34
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultDriverMessageHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.defaults;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * A default implementation of EventHandler<DriverMessage>
+ */
+@Private
+public final class DefaultDriverMessageHandler implements EventHandler<DriverMessage> {
+
+  @Inject
+  public DefaultDriverMessageHandler() {
+  }
+
+  @Override
+  public void onNext(final DriverMessage driverMessage) {
+    throw new RuntimeException("No DriverMessage handler bound. Message received:" + driverMessage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultSuspendHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultSuspendHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultSuspendHandler.java
new file mode 100644
index 0000000..3b485c6
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultSuspendHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.defaults;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default handler for SuspendEvent
+ */
+@Private
+public final class DefaultSuspendHandler implements EventHandler<SuspendEvent> {
+
+  @Inject
+  public DefaultSuspendHandler() {
+  }
+
+  @Override
+  public void onNext(final SuspendEvent suspendEvent) {
+    throw new RuntimeException("No handler for SuspendEvent registered. event: " + suspendEvent);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/package-info.java
new file mode 100644
index 0000000..d69a4aa
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Default implementations for the optional task interfaces.
+ */
+package org.apache.reef.runtime.common.evaluator.task.defaults;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCallFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCallFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCallFailure.java
new file mode 100644
index 0000000..9827a01
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCallFailure.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Thrown when a Task.call() throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskCallFailure extends Exception {
+
+  /**
+   * @param cause the exception thrown by the Task.call() method.
+   */
+  public TaskCallFailure(final Throwable cause) {
+    super("Task.call() threw an Exception.", cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCloseHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCloseHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCloseHandlerFailure.java
new file mode 100644
index 0000000..bb18044
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCloseHandlerFailure.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Thrown when a Task Close Handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskCloseHandlerFailure extends Exception {
+
+  /**
+   * @param cause the exception thrown by the Task.call() method.
+   */
+  public TaskCloseHandlerFailure(final Throwable cause) {
+    super("Task close handler threw an Exception.", cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskMessageHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskMessageHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskMessageHandlerFailure.java
new file mode 100644
index 0000000..8ccf1d1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskMessageHandlerFailure.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Thrown when a Task Message Handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskMessageHandlerFailure extends Exception {
+
+  /**
+   * the exception thrown by the task message handler's onNext() method.
+   */
+  public TaskMessageHandlerFailure(final Throwable cause) {
+    super("Task message handler threw an Exception.", cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStartHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStartHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStartHandlerFailure.java
new file mode 100644
index 0000000..24dcb35
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStartHandlerFailure.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Thrown when a TastStart handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskStartHandlerFailure extends Exception {
+
+  /**
+   * @param cause the exception thrown by the start handler
+   */
+  public TaskStartHandlerFailure(final EventHandler<TaskStart> handler, final Throwable cause) {
+    super("EventHandler<TaskStart> `" + handler.toString() + "` threw an Exception in onNext()", cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStopHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStopHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStopHandlerFailure.java
new file mode 100644
index 0000000..05d77bc
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStopHandlerFailure.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Thrown when a TaskStop handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskStopHandlerFailure extends Exception {
+
+  /**
+   * @param cause the exception thrown by the stop handler
+   */
+  public TaskStopHandlerFailure(final EventHandler<TaskStop> handler, final Throwable cause) {
+    super("EventHandler<TaskStop> `" + handler.toString() + "` threw an Exception in onNext()", cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskSuspendHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskSuspendHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskSuspendHandlerFailure.java
new file mode 100644
index 0000000..31b275e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskSuspendHandlerFailure.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Thrown when a Task Suspend Handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskSuspendHandlerFailure extends Exception {
+
+  /**
+   * @param cause the exception thrown by the task suspend handler's onNext() method.
+   */
+  public TaskSuspendHandlerFailure(final Throwable cause) {
+    super("Task suspend handler threw an Exception.", cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/package-info.java
new file mode 100644
index 0000000..1329251
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task-related implementation of the Evaluator resourcemanager.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/ClasspathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/ClasspathProvider.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/ClasspathProvider.java
new file mode 100644
index 0000000..a1ad04a
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/ClasspathProvider.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import net.jcip.annotations.Immutable;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Supplies the classpath to REEF for process (Driver, Evaluator) launches.
+ */
+@Immutable
+@RuntimeAuthor
+public final class ClasspathProvider {
+  private final List<String> driverClasspath;
+  private final List<String> evaluatorClasspath;
+
+
+  @Inject
+  ClasspathProvider(final RuntimeClasspathProvider runtimeClasspathProvider,
+                    final REEFFileNames reefFileNames) {
+    final List<String> baseClasspath = Arrays.asList(
+        reefFileNames.getLocalFolderPath() + "/*",
+        reefFileNames.getGlobalFolderPath() + "/*");
+
+    // Assemble the driver classpath
+    final List<String> runtimeDriverClasspathPrefix = runtimeClasspathProvider.getDriverClasspathPrefix();
+    final List<String> runtimeDriverClasspathSuffix = runtimeClasspathProvider.getDriverClasspathSuffix();
+    final List<String> driverClasspath = new ArrayList<>(baseClasspath.size() +
+        runtimeDriverClasspathPrefix.size() +
+        runtimeDriverClasspathSuffix.size());
+    driverClasspath.addAll(runtimeDriverClasspathPrefix);
+    driverClasspath.addAll(baseClasspath);
+    driverClasspath.addAll(runtimeDriverClasspathSuffix);
+    this.driverClasspath = Collections.unmodifiableList(driverClasspath);
+
+    // Assemble the evaluator classpath
+    final List<String> runtimeEvaluatorClasspathPrefix = runtimeClasspathProvider.getEvaluatorClasspathPrefix();
+    final List<String> runtimeEvaluatorClasspathSuffix = runtimeClasspathProvider.getEvaluatorClasspathSuffix();
+    final List<String> evaluatorClasspath = new ArrayList<>(runtimeEvaluatorClasspathPrefix.size() +
+        baseClasspath.size() +
+        runtimeEvaluatorClasspathSuffix.size());
+    evaluatorClasspath.addAll(runtimeEvaluatorClasspathPrefix);
+    evaluatorClasspath.addAll(baseClasspath);
+    evaluatorClasspath.addAll(runtimeEvaluatorClasspathSuffix);
+    this.evaluatorClasspath = Collections.unmodifiableList(evaluatorClasspath);
+  }
+
+  /**
+   * @return the classpath to be used for the Driver
+   */
+  public List<String> getDriverClasspath() {
+    return this.driverClasspath;
+  }
+
+  /**
+   * @return the classpath to be used for Evaluators.
+   */
+  public List<String> getEvaluatorClasspath() {
+    return this.evaluatorClasspath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
new file mode 100644
index 0000000..5297158
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Utility that takes a JobSubmissionProto and turns it into a Job Submission Jar.
+ */
+@Private
+@RuntimeAuthor
+@ClientSide
+public final class JobJarMaker {
+
+  private static final Logger LOG = Logger.getLogger(JobJarMaker.class.getName());
+
+  private final ConfigurationSerializer configurationSerializer;
+  private final REEFFileNames fileNames;
+  private final boolean deleteTempFilesOnExit;
+
+  @Inject
+  JobJarMaker(final ConfigurationSerializer configurationSerializer,
+              final REEFFileNames fileNames,
+              final @Parameter(DeleteTempFiles.class) boolean deleteTempFilesOnExit) {
+    this.configurationSerializer = configurationSerializer;
+    this.fileNames = fileNames;
+    this.deleteTempFilesOnExit = deleteTempFilesOnExit;
+  }
+
+  public static void copy(final Iterable<ReefServiceProtos.FileResourceProto> files, final File destinationFolder) {
+
+    if (!destinationFolder.exists()) {
+      destinationFolder.mkdirs();
+    }
+
+    for (final ReefServiceProtos.FileResourceProto fileProto : files) {
+      final File sourceFile = toFile(fileProto);
+      final File destinationFile = new File(destinationFolder, fileProto.getName());
+      if (destinationFile.exists()) {
+        LOG.log(Level.FINEST,
+            "Will not add {0} to the job jar because another file with the same name was already added.",
+            sourceFile.getAbsolutePath()
+        );
+      } else {
+        try {
+          java.nio.file.Files.copy(sourceFile.toPath(), destinationFile.toPath());
+        } catch (final IOException e) {
+          final String message = new StringBuilder("Copy of file [")
+              .append(sourceFile.getAbsolutePath())
+              .append("] to [")
+              .append(destinationFile.getAbsolutePath())
+              .append("] failed.")
+              .toString();
+          throw new RuntimeException(message, e);
+        }
+      }
+    }
+  }
+
+  private static File toFile(final ReefServiceProtos.FileResourceProto fileProto) {
+    return new File(fileProto.getPath());
+  }
+
+  public File createJobSubmissionJAR(
+      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+      final Configuration driverConfiguration) throws IOException {
+
+    // Copy all files to a local job submission folder
+    final File jobSubmissionFolder = makejobSubmissionFolder();
+    LOG.log(Level.FINE, "Staging submission in {0}", jobSubmissionFolder);
+
+    final File localFolder = new File(jobSubmissionFolder, this.fileNames.getLocalFolderName());
+    final File globalFolder = new File(jobSubmissionFolder, this.fileNames.getGlobalFolderName());
+
+    this.copy(jobSubmissionProto.getGlobalFileList(), globalFolder);
+    this.copy(jobSubmissionProto.getLocalFileList(), localFolder);
+
+    // Store the Driver Configuration in the JAR file.
+    this.configurationSerializer.toFile(
+        driverConfiguration, new File(localFolder, this.fileNames.getDriverConfigurationName()));
+
+    // Create a JAR File for the submission
+    final File jarFile = File.createTempFile(this.fileNames.getJobFolderPrefix(), this.fileNames.getJarFileSuffix());
+
+    LOG.log(Level.FINE, "Creating job submission jar file: {0}", jarFile);
+    new JARFileMaker(jarFile).addChildren(jobSubmissionFolder).close();
+
+    if (this.deleteTempFilesOnExit) {
+      LOG.log(Level.FINE,
+          "Deleting the temporary job folder [{0}] and marking the jar file [{1}] for deletion after the JVM exits.",
+          new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
+      jobSubmissionFolder.delete();
+      jarFile.deleteOnExit();
+    } else {
+      LOG.log(Level.FINE, "Keeping the temporary job folder [{0}] and jar file [{1}] available after job submission.",
+          new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
+    }
+    return jarFile;
+  }
+
+  private File makejobSubmissionFolder() throws IOException {
+    return Files.createTempDirectory(this.fileNames.getJobFolderPrefix()).toFile();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
new file mode 100644
index 0000000..0fac287
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import net.jcip.annotations.Immutable;
+
+import javax.inject.Inject;
+import java.io.File;
+
+/**
+ * Access to the various places things go according to the REEF file system standard.
+ */
+@Immutable
+public final class REEFFileNames {
+
+  private static final String REEF_BASE_FOLDER = "reef";
+  private static final String GLOBAL_FOLDER = "global";
+  static final String GLOBAL_FOLDER_PATH = REEF_BASE_FOLDER + '/' + GLOBAL_FOLDER;
+  private static final String LOCAL_FOLDER = "local";
+  static final String LOCAL_FOLDER_PATH = REEF_BASE_FOLDER + '/' + LOCAL_FOLDER;
+  private static final String DRIVER_CONFIGURATION_NAME = "driver.conf";
+  private static final String DRIVER_CONFIGURATION_PATH =
+      LOCAL_FOLDER_PATH + '/' + DRIVER_CONFIGURATION_NAME;
+  private static final String EVALUATOR_CONFIGURATION_NAME = "evaluator.conf";
+  private static final String EVALUATOR_CONFIGURATION_PATH =
+      LOCAL_FOLDER_PATH + '/' + EVALUATOR_CONFIGURATION_NAME;
+  private static final String JAR_FILE_SUFFIX = ".jar";
+  private static final String JOB_FOLDER_PREFIX = "reef-job-";
+  private static final String EVALUATOR_FOLDER_PREFIX = "reef-evaluator-";
+  private static final String DRIVER_STDERR = "driver.stderr";
+  private static final String DRIVER_STDOUT = "driver.stdout";
+  private static final String EVALUATOR_STDERR = "evaluator.stderr";
+  private static final String EVALUATOR_STDOUT = "evaluator.stdout";
+  private static final String CPP_BRIDGE = "JavaClrBridge";
+  private static final String REEF_GLOBAL = "/reef/global";
+  private static final String REEF_DRIVER_APPDLL_DIR = "/ReefDriverAppDlls/";
+  private static final String TMP_LOAD_DIR = "/reef/CLRLoadingDirectory";
+
+  @Inject
+  public REEFFileNames() {
+  }
+
+  /**
+   * The name of the REEF folder inside of the working directory of an Evaluator or Driver
+   */
+  public String getREEFFolderName() {
+    return REEF_BASE_FOLDER;
+  }
+
+  /**
+   * @return the folder und which all REEF files are stored.
+   */
+  public File getREEFFolder() {
+    return new File(getREEFFolderName());
+  }
+
+
+  /**
+   * @return the name of the folder inside of REEF_BASE_FOLDER that houses the global files.
+   */
+  public String getGlobalFolderName() {
+    return GLOBAL_FOLDER;
+  }
+
+  /**
+   * @return the path to the global folder: REEF_BASE_FOLDER/GLOBAL_FOLDER
+   */
+  public String getGlobalFolderPath() {
+    return GLOBAL_FOLDER_PATH;
+  }
+
+  /**
+   * @return the folder holding the files global to all containers.
+   */
+  public File getGlobalFolder() {
+    return new File(getREEFFolder(), getGlobalFolderName());
+  }
+
+
+  /**
+   * @return the name of the folder inside of REEF_BASE_FOLDER that houses the local files.
+   */
+  public String getLocalFolderName() {
+    return LOCAL_FOLDER;
+  }
+
+  /**
+   * @return the path to the local folder: REEF_BASE_FOLDER/LOCAL_FOLDER
+   */
+  public String getLocalFolderPath() {
+    return LOCAL_FOLDER_PATH;
+  }
+
+  /**
+   * @return the folder holding the files local to this container.
+   */
+  public File getLocalFolder() {
+    return new File(getREEFFolder(), getLocalFolderName());
+  }
+
+
+  /**
+   * The name under which the driver configuration will be stored in REEF_BASE_FOLDER/LOCAL_FOLDER
+   */
+  public String getDriverConfigurationName() {
+    return DRIVER_CONFIGURATION_NAME;
+  }
+
+  /**
+   * @return The path to the driver configuration: GLOBAL_FOLDER/LOCAL_FOLDER/DRIVER_CONFIGURATION_NAME
+   */
+  public String getDriverConfigurationPath() {
+    return DRIVER_CONFIGURATION_PATH;
+  }
+
+  /**
+   * @return The name under which the driver configuration will be stored in REEF_BASE_FOLDER/LOCAL_FOLDER
+   */
+  public String getEvaluatorConfigurationName() {
+    return EVALUATOR_CONFIGURATION_NAME;
+  }
+
+  /**
+   * @return the path to the evaluator configuration.
+   */
+  public String getEvaluatorConfigurationPath() {
+    return EVALUATOR_CONFIGURATION_PATH;
+  }
+
+  /**
+   * @return The suffix used for JAR files, including the "."
+   */
+  public String getJarFileSuffix() {
+    return JAR_FILE_SUFFIX;
+  }
+
+  /**
+   * The prefix used whenever REEF is asked to create a job folder, on (H)DFS or locally.
+   * <p/>
+   * This prefix is also used with JAR files created to represent a job.
+   */
+  public String getJobFolderPrefix() {
+    return JOB_FOLDER_PREFIX;
+  }
+
+  /**
+   * @return The name used within the current working directory of the driver to redirect standard error to.
+   */
+  public String getDriverStderrFileName() {
+    return DRIVER_STDERR;
+  }
+
+  /**
+   * @return The name used within the current working directory of the driver to redirect standard out to.
+   */
+  public String getDriverStdoutFileName() {
+    return DRIVER_STDOUT;
+  }
+
+  /**
+   * @return The prefix used whenever REEF is asked to create an Evaluator folder, e.g. for staging.
+   */
+  public String getEvaluatorFolderPrefix() {
+    return EVALUATOR_FOLDER_PREFIX;
+  }
+
+  /**
+   * @return The name used within the current working directory of the driver to redirect standard error to.
+   */
+  public String getEvaluatorStderrFileName() {
+    return EVALUATOR_STDERR;
+  }
+
+  /**
+   * @return The name used within the current working directory of the driver to redirect standard out to.
+   */
+  public String getEvaluatorStdoutFileName() {
+    return EVALUATOR_STDOUT;
+  }
+
+  /**
+   * @return the name of cpp bridge file
+   */
+  public String getCppBridge() { return CPP_BRIDGE; }
+
+  /**
+   * @return reeg global file folder
+   */
+  public String getReefGlobal() { return REEF_GLOBAL; }
+
+  /**
+   * @return reef driver app dll directory
+   */
+  public String getReefDriverAppDllDir() { return REEF_DRIVER_APPDLL_DIR; }
+
+  /**
+   * @return temp load directory
+   */
+  public String getLoadDir() { return TMP_LOAD_DIR; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/RuntimeClasspathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/RuntimeClasspathProvider.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/RuntimeClasspathProvider.java
new file mode 100644
index 0000000..8d753c8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/RuntimeClasspathProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import java.util.List;
+
+/**
+ * Interface to be implemented by each REEF runtime (YARN, Mesos, Local) to provide additional classpath elements to be
+ * pre- and postfixed to the user classpath.
+ */
+public interface RuntimeClasspathProvider {
+
+  /**
+   * @return the classpath to be prefixed in front of the user classpath of the Driver.
+   */
+  List<String> getDriverClasspathPrefix();
+
+  /**
+   * @return the classpath to be suffixed after of the user classpath of the Driver.
+   */
+  List<String> getDriverClasspathSuffix();
+
+  /**
+   * @return the classpath to be prefixed in front of the user classpath of each Evaluator.
+   */
+  List<String> getEvaluatorClasspathPrefix();
+
+  /**
+   * @return the classpath to be suffixed after of the user classpath of each Evaluator.
+   */
+  List<String> getEvaluatorClasspathSuffix();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/package-info.java
new file mode 100644
index 0000000..e0bfda0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This package will contain the implementation of the REEF file system standard.
+ */
+package org.apache.reef.runtime.common.files;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
new file mode 100644
index 0000000..7c5334b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A builder for the command line to launch a CLR Evaluator.
+ */
+public class CLRLaunchCommandBuilder implements LaunchCommandBuilder {
+  private static final Logger LOG = Logger.getLogger(CLRLaunchCommandBuilder.class.getName());
+  private static final String EVALUATOR_PATH = "reef/global/Microsoft.Reef.Evaluator.exe";
+
+
+  private String standardErrPath = null;
+  private String standardOutPath = null;
+  private String errorHandlerRID = null;
+  private String launchID = null;
+  private int megaBytes = 0;
+  private String evaluatorConfigurationPath = null;
+
+  @Override
+  public List<String> build() {
+    final List<String> result = new LinkedList<>();
+    File f = new File(EVALUATOR_PATH);
+    if (!f.exists()) {
+      LOG.log(Level.WARNING, "file can NOT be found: {0}", f.getAbsolutePath());
+    }
+    result.add(f.getPath());
+    result.add(errorHandlerRID);
+    result.add(evaluatorConfigurationPath);
+    if ((null != this.standardOutPath) && (!standardOutPath.isEmpty())) {
+      result.add(">" + this.standardOutPath);
+    }
+    if ((null != this.standardErrPath) && (!standardErrPath.isEmpty())) {
+      result.add("2>" + this.standardErrPath);
+    }
+    LOG.log(Level.FINE, "Launch Exe: {0}", StringUtils.join(result, ' '));
+    return result;
+  }
+
+  @Override
+  public CLRLaunchCommandBuilder setErrorHandlerRID(final String errorHandlerRID) {
+    this.errorHandlerRID = errorHandlerRID;
+    return this;
+  }
+
+  @Override
+  public CLRLaunchCommandBuilder setLaunchID(final String launchID) {
+    this.launchID = launchID;
+    return this;
+  }
+
+  @Override
+  public CLRLaunchCommandBuilder setMemory(final int megaBytes) {
+    this.megaBytes = megaBytes;
+    return this;
+  }
+
+  @Override
+  public CLRLaunchCommandBuilder setConfigurationFileName(final String configurationFileName) {
+    this.evaluatorConfigurationPath = configurationFileName;
+    return this;
+  }
+
+  @Override
+  public CLRLaunchCommandBuilder setStandardOut(final String standardOut) {
+    this.standardOutPath = standardOut;
+    return this;
+  }
+
+  @Override
+  public CLRLaunchCommandBuilder setStandardErr(final String standardErr) {
+    this.standardErrPath = standardErr;
+    return this;
+  }
+}