You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:09 UTC

[36/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/HeartBeatManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/HeartBeatManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/HeartBeatManager.java
new file mode 100644
index 0000000..5dc7b83
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/HeartBeatManager.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.context.ContextManager;
+import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier;
+import org.apache.reef.runtime.common.evaluator.parameters.HeartbeatPeriod;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public class HeartBeatManager {
+
+  private static final Logger LOG = Logger.getLogger(HeartBeatManager.class.getName());
+
+  private final Clock clock;
+  private final int heartbeatPeriod;
+  private final EventHandler<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatHandler;
+  private final InjectionFuture<EvaluatorRuntime> evaluatorRuntime;
+  private final InjectionFuture<ContextManager> contextManager;
+
+  @Inject
+  private HeartBeatManager(
+      final InjectionFuture<EvaluatorRuntime> evaluatorRuntime,
+      final InjectionFuture<ContextManager> contextManager,
+      final Clock clock,
+      final RemoteManager remoteManager,
+      final @Parameter(HeartbeatPeriod.class) int heartbeatPeriod,
+      final @Parameter(DriverRemoteIdentifier.class) String driverRID) {
+
+    this.evaluatorRuntime = evaluatorRuntime;
+    this.contextManager = contextManager;
+    this.clock = clock;
+    this.heartbeatPeriod = heartbeatPeriod;
+    this.evaluatorHeartbeatHandler = remoteManager.getHandler(
+        driverRID, EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class);
+  }
+
+  /**
+   * Assemble a complete new heartbeat and send it out.
+   */
+  public synchronized void sendHeartbeat() {
+    this.sendHeartBeat(this.getEvaluatorHeartbeatProto());
+  }
+
+  /**
+   * Called with a specific TaskStatus that must be delivered to the driver.
+   */
+  public synchronized void sendTaskStatus(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+    this.sendHeartBeat(this.getEvaluatorHeartbeatProto(
+        this.evaluatorRuntime.get().getEvaluatorStatus(),
+        this.contextManager.get().getContextStatusCollection(),
+        Optional.of(taskStatusProto)));
+  }
+
+  /**
+   * Called with a specific TaskStatus that must be delivered to the driver.
+   */
+  public synchronized void sendContextStatus(
+      final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+
+    // TODO: Write a test that checks for the order.
+    final Collection<ReefServiceProtos.ContextStatusProto> contextStatusList = new ArrayList<>();
+    contextStatusList.add(contextStatusProto);
+    contextStatusList.addAll(this.contextManager.get().getContextStatusCollection());
+
+    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeatProto =
+        this.getEvaluatorHeartbeatProto(
+            this.evaluatorRuntime.get().getEvaluatorStatus(),
+            contextStatusList, Optional.<ReefServiceProtos.TaskStatusProto>empty());
+
+    this.sendHeartBeat(heartbeatProto);
+  }
+
+  /**
+   * Called with a specific EvaluatorStatus that must be delivered to the driver.
+   */
+  public synchronized void sendEvaluatorStatus(
+      final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) {
+    this.sendHeartBeat(EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.newBuilder()
+        .setTimestamp(System.currentTimeMillis())
+        .setEvaluatorStatus(evaluatorStatusProto)
+        .build());
+  }
+
+  /**
+   * Sends the actual heartbeat out and logs it, so desired.
+   *
+   * @param heartbeatProto
+   */
+  private synchronized void sendHeartBeat(
+      final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeatProto) {
+    if (LOG.isLoggable(Level.FINEST)) {
+      LOG.log(Level.FINEST, "Heartbeat message:\n" + heartbeatProto, new Exception("Stack trace"));
+    }
+    this.evaluatorHeartbeatHandler.onNext(heartbeatProto);
+  }
+
+
+  private EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto getEvaluatorHeartbeatProto() {
+    return this.getEvaluatorHeartbeatProto(
+        this.evaluatorRuntime.get().getEvaluatorStatus(),
+        this.contextManager.get().getContextStatusCollection(),
+        this.contextManager.get().getTaskStatus());
+  }
+
+  private final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto getEvaluatorHeartbeatProto(
+      final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto,
+      final Iterable<ReefServiceProtos.ContextStatusProto> contextStatusProtos,
+      final Optional<ReefServiceProtos.TaskStatusProto> taskStatusProto) {
+
+    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.Builder builder =
+        EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.newBuilder()
+            .setTimestamp(System.currentTimeMillis())
+            .setEvaluatorStatus(evaluatorStatusProto);
+
+    for (final ReefServiceProtos.ContextStatusProto contextStatusProto : contextStatusProtos) {
+      builder.addContextStatus(contextStatusProto);
+    }
+
+    if (taskStatusProto.isPresent()) {
+      builder.setTaskStatus(taskStatusProto.get());
+    }
+
+    return builder.build();
+  }
+
+  final class HeartbeatAlarmHandler implements EventHandler<Alarm> {
+    @Override
+    public void onNext(final Alarm alarm) {
+      synchronized (HeartBeatManager.this) {
+        if (evaluatorRuntime.get().isRunning()) {
+          HeartBeatManager.this.sendHeartbeat();
+          HeartBeatManager.this.clock.scheduleAlarm(HeartBeatManager.this.heartbeatPeriod, this);
+        } else {
+          LOG.log(Level.FINEST,
+              "Not triggering a heartbeat, because state is: {0}",
+              evaluatorRuntime.get().getState());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/PIDStoreStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/PIDStoreStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/PIDStoreStartHandler.java
new file mode 100644
index 0000000..ea3d76a
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/PIDStoreStartHandler.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import org.apache.reef.util.OSUtils;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This Handler writes the Process ID (PID) to a file with a name given in PID_FILE_NAME to the local working directory.
+ */
+public class PIDStoreStartHandler implements EventHandler<StartTime> {
+  public static final String PID_FILE_NAME = "PID.txt";
+  private static final Logger LOG = Logger.getLogger(PIDStoreStartHandler.class.getName());
+
+  @Inject
+  public PIDStoreStartHandler() {
+  }
+
+  @Override
+  public void onNext(final StartTime startTime) {
+    final long pid = OSUtils.getPID();
+    final File outfile = new File(PID_FILE_NAME);
+    LOG.log(Level.FINEST, "Storing pid `" + pid + "` in file " + outfile.getAbsolutePath());
+    try (final PrintWriter p = new PrintWriter((new FileOutputStream(PID_FILE_NAME)))) {
+      p.write(String.valueOf(pid));
+      p.write("\n");
+    } catch (final FileNotFoundException e) {
+      LOG.log(Level.WARNING, "Unable to create PID file.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextClientCodeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextClientCodeException.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextClientCodeException.java
new file mode 100644
index 0000000..8755f39
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextClientCodeException.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.Optional;
+
+/**
+ * Thrown when we encounter a problem with client code in a context.
+ */
+public final class ContextClientCodeException extends Exception {
+  private final String contextID;
+  private final Optional<String> parentID;
+
+  /**
+   * @param contextID the ID of the failed context.
+   * @param parentID  the ID of the failed context's parent, if any.
+   * @param message   the error message.
+   * @param cause     the exception that caused the error.
+   */
+  public ContextClientCodeException(final String contextID,
+                                    final Optional<String> parentID,
+                                    final String message,
+                                    final Throwable cause) {
+    super("Failure in context '" + contextID + "': " + message, cause);
+    this.contextID = contextID;
+    this.parentID = parentID;
+  }
+
+  /**
+   * Extracts a context id from the given configuration.
+   *
+   * @param c
+   * @return the context id in the given configuration.
+   * @throws RuntimeException if the configuration can't be parsed.
+   */
+  public static String getIdentifier(final Configuration c) {
+    try {
+      return Tang.Factory.getTang().newInjector(c).getNamedInstance(
+          ContextIdentifier.class);
+    } catch (final InjectionException e) {
+      throw new RuntimeException("Unable to determine context identifier. Giving up.", e);
+    }
+  }
+
+  /**
+   * @return the ID of the failed context
+   */
+  public String getContextID() {
+    return this.contextID;
+  }
+
+  /**
+   * @return the ID of the failed context's parent, if any
+   */
+  public Optional<String> getParentID() {
+    return this.parentID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextLifeCycle.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextLifeCycle.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextLifeCycle.java
new file mode 100644
index 0000000..586ef89
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextLifeCycle.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import org.apache.reef.evaluator.context.ContextMessageSource;
+import org.apache.reef.evaluator.context.events.ContextStart;
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.evaluator.context.parameters.*;
+import org.apache.reef.runtime.common.utils.BroadCastEventHandler;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * This class is used to trigger all the context life-cycle dependent events.
+ */
+final class ContextLifeCycle {
+
+  private final String identifier;
+  private final Set<EventHandler<ContextStart>> contextStartHandlers;
+  private final Set<EventHandler<ContextStop>> contextStopHandlers;
+  private final Set<ContextMessageSource> contextMessageSources;
+  private final EventHandler<byte[]> contextMessageHandler;
+
+  @Inject
+  ContextLifeCycle(final @Parameter(ContextIdentifier.class) String identifier,
+                   final @Parameter(ContextMessageHandlers.class) Set<EventHandler<byte[]>> contextMessageHandlers,
+                   final @Parameter(ContextStartHandlers.class) Set<EventHandler<ContextStart>> contextStartHandlers,
+                   final @Parameter(ContextStopHandlers.class) Set<EventHandler<ContextStop>> contextStopHandlers,
+                   final @Parameter(ContextMessageSources.class) Set<ContextMessageSource> contextMessageSources) {
+    this.identifier = identifier;
+    this.contextStartHandlers = contextStartHandlers;
+    this.contextStopHandlers = contextStopHandlers;
+    this.contextMessageSources = contextMessageSources;
+    this.contextMessageHandler = new BroadCastEventHandler<>(contextMessageHandlers);
+  }
+
+  /**
+   * Fires ContextStart to all registered event handlers.
+   */
+  final void start() {
+    final ContextStart contextStart = new ContextStartImpl(this.identifier);
+    for (final EventHandler<ContextStart> startHandler : this.contextStartHandlers) {
+      startHandler.onNext(contextStart);
+    }
+  }
+
+  /**
+   * Fires ContextStop to all registered event handlers.
+   */
+  final void close() {
+    final ContextStop contextStop = new ContextStopImpl(this.identifier);
+    for (final EventHandler<ContextStop> stopHandler : this.contextStopHandlers) {
+      stopHandler.onNext(contextStop);
+    }
+  }
+
+  /**
+   * Deliver the driver message to the context message handler
+   *
+   * @param message sent by the driver
+   */
+  final void handleContextMessage(final byte[] message) {
+    this.contextMessageHandler.onNext(message);
+  }
+
+  /**
+   * @return (a shallow copy of) the set of ContextMessageSources configured.
+   */
+  final Set<ContextMessageSource> getContextMessageSources() {
+    return Collections.unmodifiableSet(new LinkedHashSet<>(this.contextMessageSources));
+  }
+
+  final String getIdentifier() {
+    return this.identifier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextManager.java
new file mode 100644
index 0000000..e15e1f8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextManager.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
+import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Stack;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages the stack of context in the Evaluator.
+ */
+@Private
+@EvaluatorSide
+@Provided
+public final class ContextManager implements AutoCloseable {
+
+  private static final Logger LOG = Logger.getLogger(ContextManager.class.getName());
+
+  /**
+   * The stack of context.
+   */
+  private final Stack<ContextRuntime> contextStack = new Stack<>();
+
+  /**
+   * Used to instantiate the root context.
+   */
+  private final InjectionFuture<RootContextLauncher> launchContext;
+
+  /**
+   * Used for status reporting to the Driver.
+   */
+  private final HeartBeatManager heartBeatManager;
+
+  /**
+   * To serialize Configurations.
+   */
+  private final ConfigurationSerializer configurationSerializer;
+
+  private final ExceptionCodec exceptionCodec;
+
+  /**
+   * @param launchContext           to instantiate the root context.
+   * @param heartBeatManager        for status reporting to the Driver.
+   * @param configurationSerializer
+   * @param exceptionCodec
+   */
+  @Inject
+  ContextManager(final InjectionFuture<RootContextLauncher> launchContext,
+                 final HeartBeatManager heartBeatManager,
+                 final ConfigurationSerializer configurationSerializer,
+                 final ExceptionCodec exceptionCodec) {
+    this.launchContext = launchContext;
+    this.heartBeatManager = heartBeatManager;
+    this.configurationSerializer = configurationSerializer;
+    this.exceptionCodec = exceptionCodec;
+  }
+
+  /**
+   * Start the context manager. This initiates the root context.
+   *
+   * @throws ContextClientCodeException if the root context can't be instantiated.
+   */
+  public void start() throws ContextClientCodeException {
+    synchronized (this.contextStack) {
+      LOG.log(Level.FINEST, "Instantiating root context.");
+      this.contextStack.push(this.launchContext.get().getRootContext());
+
+      if (this.launchContext.get().getInitialTaskConfiguration().isPresent()) {
+        LOG.log(Level.FINEST, "Launching the initial Task");
+        try {
+          this.contextStack.peek().startTask(this.launchContext.get().getInitialTaskConfiguration().get());
+        } catch (final TaskClientCodeException e) {
+          this.handleTaskException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack,
+   * starting at the top.
+   */
+  @Override
+  public void close() {
+    synchronized (this.contextStack) {
+      if (!this.contextStackIsEmpty()) {
+        this.contextStack.lastElement().close();
+      }
+    }
+  }
+
+  /**
+   * @return true if there is no context.
+   */
+  public boolean contextStackIsEmpty() {
+    synchronized (this.contextStack) {
+      return this.contextStack.isEmpty();
+    }
+  }
+
+  /**
+   * Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts.
+   * <p/>
+   * This also triggers the HeartBeatManager to send a heartbeat with the result of this operation.
+   *
+   * @param controlMessage the message to process
+   */
+  public void handleContextControlProtocol(
+      final EvaluatorRuntimeProtocol.ContextControlProto controlMessage) {
+
+    synchronized (this.heartBeatManager) {
+      try {
+        if (controlMessage.hasAddContext() && controlMessage.hasRemoveContext()) {
+          throw new IllegalArgumentException(
+              "Received a message with both add and remove context. This is unsupported.");
+        }
+
+        final byte[] message = controlMessage.hasTaskMessage() ?
+            controlMessage.getTaskMessage().toByteArray() : null;
+
+        if (controlMessage.hasAddContext()) {
+          this.addContext(controlMessage.getAddContext());
+          if (controlMessage.hasStartTask()) {
+            // We support submitContextAndTask()
+            this.startTask(controlMessage.getStartTask());
+          } else {
+            // We need to trigger a heartbeat here.
+            // In other cases, the heartbeat will be triggered by the TaskRuntime
+            // Therefore this call can not go into addContext.
+            this.heartBeatManager.sendHeartbeat();
+          }
+        } else if (controlMessage.hasRemoveContext()) {
+          this.removeContext(controlMessage.getRemoveContext().getContextId());
+        } else if (controlMessage.hasStartTask()) {
+          this.startTask(controlMessage.getStartTask());
+        } else if (controlMessage.hasStopTask()) {
+          this.contextStack.peek().closeTask(message);
+        } else if (controlMessage.hasSuspendTask()) {
+          this.contextStack.peek().suspendTask(message);
+        } else if (controlMessage.hasTaskMessage()) {
+          this.contextStack.peek().deliverTaskMessage(message);
+        } else if (controlMessage.hasContextMessage()) {
+          final EvaluatorRuntimeProtocol.ContextMessageProto contextMessageProto = controlMessage.getContextMessage();
+          boolean deliveredMessage = false;
+          for (final ContextRuntime context : this.contextStack) {
+            if (context.getIdentifier().equals(contextMessageProto.getContextId())) {
+              context.handleContextMessage(contextMessageProto.getMessage().toByteArray());
+              deliveredMessage = true;
+              break;
+            }
+          }
+          if (!deliveredMessage) {
+            throw new IllegalStateException(
+                "Sent message to unknown context " + contextMessageProto.getContextId());
+          }
+        } else {
+          throw new RuntimeException("Unknown task control message: " + controlMessage);
+        }
+      } catch (final TaskClientCodeException e) {
+        this.handleTaskException(e);
+      } catch (final ContextClientCodeException e) {
+        this.handleContextException(e);
+      }
+    }
+
+  }
+
+  /**
+   * @return the TaskStatusProto of the currently running task, if there is any
+   */
+  public Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() {
+    synchronized (this.contextStack) {
+      if (this.contextStack.isEmpty()) {
+        throw new RuntimeException(
+            "Asked for a Task status while there isn't even a context running.");
+      }
+      return this.contextStack.peek().getTaskStatus();
+    }
+  }
+
+  /**
+   * @return the status of all context in the stack.
+   */
+  public Collection<ReefServiceProtos.ContextStatusProto> getContextStatusCollection() {
+    synchronized (this.contextStack) {
+      final List<ReefServiceProtos.ContextStatusProto> result = new ArrayList<>(this.contextStack.size());
+      for (final ContextRuntime contextRuntime : this.contextStack) {
+        final ReefServiceProtos.ContextStatusProto contextStatusProto = contextRuntime.getContextStatus();
+        LOG.log(Level.FINEST, "Add context status: {0}", contextStatusProto);
+        result.add(contextStatusProto);
+      }
+      return result;
+    }
+  }
+
+  /**
+   * Add a context to the stack.
+   *
+   * @param addContextProto
+   * @throws ContextClientCodeException if there is a client code related issue.
+   */
+  private void addContext(
+      final EvaluatorRuntimeProtocol.AddContextProto addContextProto)
+      throws ContextClientCodeException {
+
+    synchronized (this.contextStack) {
+      try {
+
+        final ContextRuntime currentTopContext = this.contextStack.peek();
+
+        if (!currentTopContext.getIdentifier().equals(addContextProto.getParentContextId())) {
+          throw new IllegalStateException("Trying to instantiate a child context on context with id `" +
+              addContextProto.getParentContextId() + "` while the current top context id is `" +
+              currentTopContext.getIdentifier() + "`");
+        }
+
+        final Configuration contextConfiguration =
+            this.configurationSerializer.fromString(addContextProto.getContextConfiguration());
+
+        final ContextRuntime newTopContext;
+        if (addContextProto.hasServiceConfiguration()) {
+          newTopContext = currentTopContext.spawnChildContext(contextConfiguration,
+              this.configurationSerializer.fromString(addContextProto.getServiceConfiguration()));
+        } else {
+          newTopContext = currentTopContext.spawnChildContext(contextConfiguration);
+        }
+
+        this.contextStack.push(newTopContext);
+
+      } catch (final IOException | BindException e) {
+        throw new RuntimeException("Unable to read configuration.", e);
+      }
+
+    }
+  }
+
+  /**
+   * Remove the context with the given ID from the stack.
+   *
+   * @throws IllegalStateException if the given ID does not refer to the top of stack.
+   */
+  private void removeContext(final String contextID) {
+
+    synchronized (this.contextStack) {
+
+      if (!contextID.equals(this.contextStack.peek().getIdentifier())) {
+        throw new IllegalStateException("Trying to close context with id `" + contextID +
+            "`. But the top context has id `" +
+            this.contextStack.peek().getIdentifier() + "`");
+      }
+
+      this.contextStack.peek().close();
+      if (this.contextStack.size() > 1) {
+        /* We did not close the root context. Therefore, we need to inform the
+         * driver explicitly that this context is closed. The root context notification
+         * is implicit in the Evaluator close/done notification.
+         */
+        this.heartBeatManager.sendHeartbeat(); // Ensure Driver gets notified of context DONE state
+      }
+      this.contextStack.pop();
+      System.gc(); // TODO sure??
+    }
+  }
+
+  /**
+   * Launch a Task.
+   */
+  private void startTask(
+      final EvaluatorRuntimeProtocol.StartTaskProto startTaskProto) throws TaskClientCodeException {
+
+    synchronized (this.contextStack) {
+
+      final ContextRuntime currentActiveContext = this.contextStack.peek();
+
+      final String expectedContextId = startTaskProto.getContextId();
+      if (!expectedContextId.equals(currentActiveContext.getIdentifier())) {
+        throw new IllegalStateException("Task expected context `" + expectedContextId +
+            "` but the active context has ID `" + currentActiveContext.getIdentifier() + "`");
+      }
+
+      try {
+        final Configuration taskConfig =
+            this.configurationSerializer.fromString(startTaskProto.getConfiguration());
+        currentActiveContext.startTask(taskConfig);
+      } catch (IOException | BindException e) {
+        throw new RuntimeException("Unable to read configuration.", e);
+      }
+    }
+  }
+
+  /**
+   * THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+   */
+  private void handleTaskException(final TaskClientCodeException e) {
+
+    LOG.log(Level.SEVERE, "TaskClientCodeException", e);
+
+    final ByteString exception = ByteString.copyFrom(this.exceptionCodec.toBytes(e.getCause()));
+
+    final ReefServiceProtos.TaskStatusProto taskStatus =
+        ReefServiceProtos.TaskStatusProto.newBuilder()
+            .setContextId(e.getContextId())
+            .setTaskId(e.getTaskId())
+            .setResult(exception)
+            .setState(ReefServiceProtos.State.FAILED)
+            .build();
+
+    LOG.log(Level.SEVERE, "Sending heartbeat: {0}", taskStatus);
+
+    this.heartBeatManager.sendTaskStatus(taskStatus);
+  }
+
+  /**
+   * THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+   */
+  private void handleContextException(final ContextClientCodeException e) {
+
+    LOG.log(Level.SEVERE, "ContextClientCodeException", e);
+
+    final ByteString exception = ByteString.copyFrom(this.exceptionCodec.toBytes(e.getCause()));
+
+    final ReefServiceProtos.ContextStatusProto.Builder contextStatusBuilder =
+        ReefServiceProtos.ContextStatusProto.newBuilder()
+            .setContextId(e.getContextID())
+            .setContextState(ReefServiceProtos.ContextStatusProto.State.FAIL)
+            .setError(exception);
+
+    if (e.getParentID().isPresent()) {
+      contextStatusBuilder.setParentId(e.getParentID().get());
+    }
+
+    final ReefServiceProtos.ContextStatusProto contextStatus = contextStatusBuilder.build();
+
+    LOG.log(Level.SEVERE, "Sending heartbeat: {0}", contextStatus);
+
+    this.heartBeatManager.sendContextStatus(contextStatus);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextRuntime.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextRuntime.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextRuntime.java
new file mode 100644
index 0000000..044f5c4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextRuntime.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.evaluator.context.ContextMessage;
+import org.apache.reef.evaluator.context.ContextMessageSource;
+import org.apache.reef.evaluator.context.parameters.Services;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException;
+import org.apache.reef.runtime.common.evaluator.task.TaskRuntime;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.Optional;
+
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The evaluator side resourcemanager for Contexts.
+ */
+@Provided
+@Private
+@EvaluatorSide
+public final class ContextRuntime {
+
+  private static final Logger LOG = Logger.getLogger(ContextRuntime.class.getName());
+
+  /**
+   * Context-local injector. This contains information that will not be available in child injectors.
+   */
+  private final Injector contextInjector;
+
+  /**
+   * Service injector. State in this injector moves to child injectors.
+   */
+  private final Injector serviceInjector;
+
+  /**
+   * Convenience class to hold all the event handlers for the context as well as the service instances.
+   */
+  private final ContextLifeCycle contextLifeCycle;
+  /**
+   * The parent context, if there is any.
+   */
+  private final Optional<ContextRuntime> parentContext; // guarded by this
+  /**
+   * The child context, if there is any.
+   */
+  private Optional<ContextRuntime> childContext = Optional.empty(); // guarded by this
+  /**
+   * The currently running task, if there is any.
+   */
+  private Optional<TaskRuntime> task = Optional.empty(); // guarded by this
+
+  private Thread taskRuntimeThread = null;
+
+  // TODO: Which lock guards this?
+  private ReefServiceProtos.ContextStatusProto.State contextState =
+      ReefServiceProtos.ContextStatusProto.State.READY;
+
+  /**
+   * Create a new ContextRuntime.
+   *
+   * @param serviceInjector      the serviceInjector to be used.
+   * @param contextConfiguration the Configuration for this context.
+   * @throws ContextClientCodeException if the context cannot be instantiated.
+   */
+  ContextRuntime(final Injector serviceInjector, final Configuration contextConfiguration,
+                 final Optional<ContextRuntime> parentContext) throws ContextClientCodeException {
+
+    this.serviceInjector = serviceInjector;
+    this.parentContext = parentContext;
+
+    // Trigger the instantiation of the services
+    try {
+
+      final Set<Object> services = serviceInjector.getNamedInstance(Services.class);
+      this.contextInjector = serviceInjector.forkInjector(contextConfiguration);
+
+      this.contextLifeCycle = this.contextInjector.getInstance(ContextLifeCycle.class);
+
+    } catch (BindException | InjectionException e) {
+
+      final Optional<String> parentID = this.getParentContext().isPresent() ?
+          Optional.of(this.getParentContext().get().getIdentifier()) :
+          Optional.<String>empty();
+
+      throw new ContextClientCodeException(
+          ContextClientCodeException.getIdentifier(contextConfiguration),
+          parentID, "Unable to spawn context", e);
+    }
+
+    // Trigger the context start events on contextInjector.
+    this.contextLifeCycle.start();
+  }
+
+  /**
+   * Create a new ContextRuntime for the root context.
+   *
+   * @param serviceInjector      the serviceInjector to be used.
+   * @param contextConfiguration the Configuration for this context.
+   * @throws ContextClientCodeException if the context cannot be instantiated.
+   */
+  ContextRuntime(final Injector serviceInjector,
+                 final Configuration contextConfiguration) throws ContextClientCodeException {
+    this(serviceInjector, contextConfiguration, Optional.<ContextRuntime>empty());
+    LOG.log(Level.FINEST, "Instantiating root context");
+  }
+
+
+  /**
+   * Spawns a new context.
+   * <p/>
+   * The new context will have a serviceInjector that is created by forking the one in this object with the given
+   * serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+   *
+   * @param contextConfiguration the new context's context (local) Configuration.
+   * @param serviceConfiguration the new context's service Configuration.
+   * @return a child context.
+   * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues
+   * @throws IllegalStateException      If this method is called when there is either a task or child context already
+   *                                    present.
+   */
+  ContextRuntime spawnChildContext(
+      final Configuration contextConfiguration,
+      final Configuration serviceConfiguration) throws ContextClientCodeException {
+
+    synchronized (this.contextLifeCycle) {
+
+      if (this.task.isPresent()) {
+        throw new IllegalStateException(
+            "Attempting to spawn a child context when a Task with id '" +
+                this.task.get().getId() + "' is running.");
+      }
+
+      if (this.childContext.isPresent()) {
+        throw new IllegalStateException(
+            "Attempting to instantiate a child context on a context that is not the topmost active context");
+      }
+
+      try {
+
+        final Injector childServiceInjector =
+            this.serviceInjector.forkInjector(serviceConfiguration);
+
+        final ContextRuntime childContext =
+            new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
+
+        this.childContext = Optional.of(childContext);
+        return childContext;
+
+      } catch (final BindException e) {
+
+        final Optional<String> parentID = this.getParentContext().isPresent() ?
+            Optional.of(this.getParentContext().get().getIdentifier()) :
+            Optional.<String>empty();
+
+        throw new ContextClientCodeException(
+            ContextClientCodeException.getIdentifier(contextConfiguration),
+            parentID, "Unable to spawn context", e);
+      }
+    }
+  }
+
+  /**
+   * Spawns a new context without services of its own.
+   * <p/>
+   * The new context will have a serviceInjector that is created by forking the one in this object. The
+   * contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+   *
+   * @param contextConfiguration the new context's context (local) Configuration.
+   * @return a child context.
+   * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues.
+   * @throws IllegalStateException      If this method is called when there is either a task or child context already
+   *                                    present.
+   */
+  ContextRuntime spawnChildContext(
+      final Configuration contextConfiguration) throws ContextClientCodeException {
+
+    synchronized (this.contextLifeCycle) {
+
+      if (this.task.isPresent()) {
+        throw new IllegalStateException(
+            "Attempting to to spawn a child context while a Task with id '" +
+                this.task.get().getId() + "' is running.");
+      }
+
+      if (this.childContext.isPresent()) {
+        throw new IllegalStateException(
+            "Attempting to spawn a child context on a context that is not the topmost active context");
+      }
+
+      final Injector childServiceInjector = this.serviceInjector.forkInjector();
+      final ContextRuntime childContext =
+          new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
+
+      this.childContext = Optional.of(childContext);
+      return childContext;
+    }
+  }
+
+  /**
+   * Launches a Task on this context.
+   *
+   * @param taskConfig the configuration to be used for the task.
+   * @throws org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException If the Task cannot be instantiated due to user code / configuration issues.
+   * @throws IllegalStateException                                                 If this method is called when there is either a task or child context already present.
+   */
+  void startTask(final Configuration taskConfig) throws TaskClientCodeException {
+
+    synchronized (this.contextLifeCycle) {
+
+      if (this.task.isPresent() && this.task.get().hasEnded()) {
+        // clean up state
+        this.task = Optional.empty();
+      }
+
+      if (this.task.isPresent()) {
+        throw new IllegalStateException("Attempting to start a Task when a Task with id '" +
+            this.task.get().getId() + "' is running.");
+      }
+
+      if (this.childContext.isPresent()) {
+        throw new IllegalStateException(
+            "Attempting to start a Task on a context that is not the topmost active context");
+      }
+
+      try {
+        final Injector taskInjector = this.contextInjector.forkInjector(taskConfig);
+        final TaskRuntime taskRuntime = taskInjector.getInstance(TaskRuntime.class);
+        taskRuntime.initialize();
+        this.taskRuntimeThread = new Thread(taskRuntime, taskRuntime.getId());
+        this.taskRuntimeThread.start();
+        this.task = Optional.of(taskRuntime);
+        LOG.log(Level.FINEST, "Started task: {0}", taskRuntime.getTaskId());
+      } catch (final BindException | InjectionException e) {
+        throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
+            this.getIdentifier(),
+            "Unable to instantiate the new task", e);
+      } catch (final Throwable t) {
+        throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
+            this.getIdentifier(),
+            "Unable to start the new task", t);
+      }
+    }
+  }
+
+  /**
+   * Close this context. If there is a child context, this recursively closes it before closing this context. If
+   * there is a Task currently running, that will be closed.
+   */
+  final void close() {
+
+    synchronized (this.contextLifeCycle) {
+
+      this.contextState = ReefServiceProtos.ContextStatusProto.State.DONE;
+
+      if (this.task.isPresent()) {
+        LOG.log(Level.WARNING, "Shutting down a task because the underlying context is being closed.");
+        this.task.get().close(null);
+      }
+
+      if (this.childContext.isPresent()) {
+        LOG.log(Level.WARNING, "Closing a context because its parent context is being closed.");
+        this.childContext.get().close();
+      }
+
+      this.contextLifeCycle.close();
+
+      if (this.parentContext.isPresent()) {
+        this.parentContext.get().resetChildContext();
+      }
+    }
+  }
+
+  /**
+   * @return the parent context, if there is one.
+   */
+  Optional<ContextRuntime> getParentContext() {
+    return this.parentContext;
+  }
+
+  /**
+   * Deliver the given message to the Task.
+   * <p/>
+   * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+   * in the log.
+   *
+   * @param message the suspend message to deliver or null if there is none.
+   */
+  void suspendTask(final byte[] message) {
+    synchronized (this.contextLifeCycle) {
+      if (!this.task.isPresent()) {
+        LOG.log(Level.WARNING, "Received a suspend task while there was no task running. Ignoring.");
+      } else {
+        this.task.get().suspend(message);
+      }
+    }
+  }
+
+  /**
+   * Issue a close call to the Task
+   * <p/>
+   * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+   * in the log.
+   *
+   * @param message the close  message to deliver or null if there is none.
+   */
+  void closeTask(final byte[] message) {
+    synchronized (this.contextLifeCycle) {
+      if (!this.task.isPresent()) {
+        LOG.log(Level.WARNING, "Received a close task while there was no task running. Ignoring.");
+      } else {
+        this.task.get().close(message);
+      }
+    }
+  }
+
+  /**
+   * Deliver a message to the Task
+   * <p/>
+   * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+   * in the log.
+   *
+   * @param message the close  message to deliver or null if there is none.
+   */
+  void deliverTaskMessage(final byte[] message) {
+    synchronized (this.contextLifeCycle) {
+      if (!this.task.isPresent()) {
+        LOG.log(Level.WARNING, "Received a task message while there was no task running. Ignoring.");
+      } else {
+        this.task.get().deliver(message);
+      }
+    }
+  }
+
+  /**
+   * @return the identifier of this context.
+   */
+  String getIdentifier() {
+    return this.contextLifeCycle.getIdentifier();
+  }
+
+  /**
+   * Handle the context message.
+   *
+   * @param message sent by the driver
+   */
+  final void handleContextMessage(final byte[] message) {
+    this.contextLifeCycle.handleContextMessage(message);
+  }
+
+  /**
+   * @return the state of the running Task, if one is running.
+   */
+  Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() {
+    synchronized (this.contextLifeCycle) {
+      if (this.task.isPresent()) {
+        if (this.task.get().hasEnded()) {
+          this.task = Optional.empty();
+          return Optional.empty();
+        } else {
+          return Optional.of(this.task.get().getStatusProto());
+        }
+      } else {
+        return Optional.empty();
+      }
+    }
+  }
+
+  /**
+   * Called by the child context when it has been closed.
+   */
+  private void resetChildContext() {
+    synchronized (this.contextLifeCycle) {
+      if (this.childContext.isPresent()) {
+        this.childContext = Optional.empty();
+      } else {
+        throw new IllegalStateException("no child context set");
+      }
+    }
+  }
+
+  /**
+   * @return this context's status in protocol buffer form.
+   */
+  ReefServiceProtos.ContextStatusProto getContextStatus() {
+
+    synchronized (this.contextLifeCycle) {
+
+      final ReefServiceProtos.ContextStatusProto.Builder builder =
+          ReefServiceProtos.ContextStatusProto.newBuilder()
+              .setContextId(this.getIdentifier())
+              .setContextState(this.contextState);
+
+      if (this.parentContext.isPresent()) {
+        builder.setParentId(this.parentContext.get().getIdentifier());
+      }
+
+      for (final ContextMessageSource contextMessageSource : this.contextLifeCycle.getContextMessageSources()) {
+        final Optional<ContextMessage> contextMessageOptional = contextMessageSource.getMessage();
+        if (contextMessageOptional.isPresent()) {
+          builder.addContextMessage(ReefServiceProtos.ContextStatusProto.ContextMessageProto.newBuilder()
+              .setSourceId(contextMessageOptional.get().getMessageSourceID())
+              .setMessage(ByteString.copyFrom(contextMessageOptional.get().get()))
+              .build());
+        }
+      }
+
+      return builder.build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStartImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStartImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStartImpl.java
new file mode 100644
index 0000000..8ca4349
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStartImpl.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+
+import org.apache.reef.evaluator.context.events.ContextStart;
+
+final class ContextStartImpl implements ContextStart {
+
+  private final String identifier;
+
+
+  ContextStartImpl(final String identifier) {
+    this.identifier = identifier;
+  }
+
+  @Override
+  public String getId() {
+    return this.identifier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStopImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStopImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStopImpl.java
new file mode 100644
index 0000000..74e1b10
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStopImpl.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import org.apache.reef.evaluator.context.events.ContextStop;
+
+final class ContextStopImpl implements ContextStop {
+
+  private final String identifier;
+
+  ContextStopImpl(final String identifier) {
+    this.identifier = identifier;
+  }
+
+  @Override
+  public String getId() {
+    return this.identifier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/RootContextLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/RootContextLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/RootContextLauncher.java
new file mode 100644
index 0000000..448949c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/RootContextLauncher.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import org.apache.reef.runtime.common.evaluator.parameters.InitialTaskConfiguration;
+import org.apache.reef.runtime.common.evaluator.parameters.RootContextConfiguration;
+import org.apache.reef.runtime.common.evaluator.parameters.RootServiceConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+/**
+ * Helper class that encapsulates the root context configuration: With or without services and an initial task.
+ */
+final class RootContextLauncher {
+
+  private final Injector injector;
+  private final Configuration rootContextConfiguration;
+  private final Optional<Configuration> rootServiceConfiguration;
+  private final Optional<Configuration> initialTaskConfiguration;
+  private final ConfigurationSerializer configurationSerializer;
+  private ContextRuntime rootContext = null;
+
+  @Inject
+  RootContextLauncher(final @Parameter(RootContextConfiguration.class) String rootContextConfiguration,
+                      final @Parameter(RootServiceConfiguration.class) String rootServiceConfiguration,
+                      final @Parameter(InitialTaskConfiguration.class) String initialTaskConfiguration,
+                      final Injector injector, final ConfigurationSerializer configurationSerializer) throws IOException, BindException {
+    this.injector = injector;
+    this.configurationSerializer = configurationSerializer;
+    this.rootContextConfiguration = this.configurationSerializer.fromString(rootContextConfiguration);
+    this.rootServiceConfiguration = Optional.of(this.configurationSerializer.fromString(rootServiceConfiguration));
+    this.initialTaskConfiguration = Optional.of(this.configurationSerializer.fromString(initialTaskConfiguration));
+  }
+
+  @Inject
+  RootContextLauncher(final @Parameter(RootContextConfiguration.class) String rootContextConfiguration,
+                      final Injector injector,
+                      final @Parameter(RootServiceConfiguration.class) String rootServiceConfiguration, final ConfigurationSerializer configurationSerializer) throws IOException, BindException {
+    this.injector = injector;
+    this.configurationSerializer = configurationSerializer;
+    this.rootContextConfiguration = this.configurationSerializer.fromString(rootContextConfiguration);
+    this.rootServiceConfiguration = Optional.of(this.configurationSerializer.fromString(rootServiceConfiguration));
+    this.initialTaskConfiguration = Optional.empty();
+  }
+
+  @Inject
+  RootContextLauncher(final Injector injector,
+                      final @Parameter(RootContextConfiguration.class) String rootContextConfiguration,
+                      final @Parameter(InitialTaskConfiguration.class) String initialTaskConfiguration, final ConfigurationSerializer configurationSerializer) throws IOException, BindException {
+    this.injector = injector;
+    this.configurationSerializer = configurationSerializer;
+    this.rootContextConfiguration = this.configurationSerializer.fromString(rootContextConfiguration);
+    this.rootServiceConfiguration = Optional.empty();
+    this.initialTaskConfiguration = Optional.of(this.configurationSerializer.fromString(initialTaskConfiguration));
+  }
+
+  @Inject
+  RootContextLauncher(final @Parameter(RootContextConfiguration.class) String rootContextConfiguration,
+                      final Injector injector, final ConfigurationSerializer configurationSerializer) throws IOException, BindException {
+    this.injector = injector;
+    this.configurationSerializer = configurationSerializer;
+    this.rootContextConfiguration = this.configurationSerializer.fromString(rootContextConfiguration);
+    this.rootServiceConfiguration = Optional.empty();
+    this.initialTaskConfiguration = Optional.empty();
+  }
+
+  /**
+   * Instantiates the root context.
+   * <p/>
+   * This also launches the initial task if there is any.
+   *
+   * @param injector
+   * @param rootContextConfiguration
+   * @param rootServiceConfiguration
+   * @return ContextRuntime
+   * @throws ContextClientCodeException
+   */
+  private static ContextRuntime getRootContext(final Injector injector,
+                                               final Configuration rootContextConfiguration,
+                                               final Optional<Configuration> rootServiceConfiguration)
+      throws ContextClientCodeException {
+    final ContextRuntime result;
+    if (rootServiceConfiguration.isPresent()) {
+      final Injector rootServiceInjector;
+      try {
+        rootServiceInjector = injector.forkInjector(rootServiceConfiguration.get());
+      } catch (final BindException e) {
+        throw new ContextClientCodeException(ContextClientCodeException.getIdentifier(rootContextConfiguration),
+            Optional.<String>empty(), "Unable to instatiate the root context", e);
+      }
+      result = new ContextRuntime(rootServiceInjector, rootContextConfiguration);
+    } else {
+      result = new ContextRuntime(injector.forkInjector(), rootContextConfiguration);
+    }
+    return result;
+  }
+
+  /**
+   * @return the root context for this evaluator.
+   */
+  final ContextRuntime getRootContext() throws ContextClientCodeException {
+    if (null == this.rootContext) {
+      this.rootContext = getRootContext(this.injector, this.rootContextConfiguration, this.rootServiceConfiguration);
+    }
+    return this.rootContext;
+  }
+
+  final Optional<Configuration> getInitialTaskConfiguration() {
+    return this.initialTaskConfiguration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageHandler.java
new file mode 100644
index 0000000..0007c00
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageHandler.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.evaluator.context.ContextMessageHandler;
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Default handler for messages sent by the driver: Crash the context.
+ */
+@EvaluatorSide
+public final class DefaultContextMessageHandler implements ContextMessageHandler {
+
+  private final String contextID;
+
+  @Inject
+  DefaultContextMessageHandler(final @Parameter(ContextIdentifier.class) String contextID) {
+    this.contextID = contextID;
+  }
+
+  @Override
+  public void onNext(final byte[] message) {
+    throw new IllegalStateException("No message handlers given for context " + this.contextID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageSource.java
new file mode 100644
index 0000000..ed7a7af
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageSource.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.evaluator.context.ContextMessage;
+import org.apache.reef.evaluator.context.ContextMessageSource;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+
+/**
+ * Default ContextMessageSource: return nothing.
+ */
+@EvaluatorSide
+@Provided
+public final class DefaultContextMessageSource implements ContextMessageSource {
+
+  @Inject
+  public DefaultContextMessageSource() {
+  }
+
+  @Override
+  public Optional<ContextMessage> getMessage() {
+    return Optional.empty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStartHandler.java
new file mode 100644
index 0000000..a3fbce5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStartHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.evaluator.context.events.ContextStart;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for ContextStart
+ */
+@EvaluatorSide
+public final class DefaultContextStartHandler implements EventHandler<ContextStart> {
+
+  @Inject
+  DefaultContextStartHandler() {
+  }
+
+  @Override
+  public void onNext(final ContextStart contextStart) {
+    Logger.getLogger(this.getClass().toString()).log(Level.INFO, "DefaultContextStartHandler received: " + contextStart);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStopHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStopHandler.java
new file mode 100644
index 0000000..4031a4b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStopHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for ContextStop
+ */
+@EvaluatorSide
+public final class DefaultContextStopHandler implements EventHandler<ContextStop> {
+
+  @Inject
+  DefaultContextStopHandler() {
+  }
+
+  @Override
+  public void onNext(final ContextStop contextStop) {
+    Logger.getLogger(this.getClass().toString()).log(Level.INFO, "DefaultContextStopHandler received: " + contextStop);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/package-info.java
new file mode 100644
index 0000000..5e572c7
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Default implementations for the optional context interfaces.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/package-info.java
new file mode 100644
index 0000000..f7d4390
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Context implementation of the Evaluator resourcemanager.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/package-info.java
new file mode 100644
index 0000000..78acce3
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Evaluator-side implementation of the REEF API.
+ */
+package org.apache.reef.runtime.common.evaluator;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/ApplicationIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/ApplicationIdentifier.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/ApplicationIdentifier.java
new file mode 100644
index 0000000..6d405f8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/ApplicationIdentifier.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The RM application/job identifier.
+ * <p/>
+ * In YARN, this is the applicationID assigned by the resource manager.
+ */
+@NamedParameter(doc = "The RM application/job identifier.")
+public final class ApplicationIdentifier implements Name<String> {
+  private ApplicationIdentifier() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/DriverRemoteIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/DriverRemoteIdentifier.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/DriverRemoteIdentifier.java
new file mode 100644
index 0000000..b9e803e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/DriverRemoteIdentifier.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The identifier used by the Evaluator to connect back to the Driver.
+ */
+@NamedParameter(doc = "The identifier used by the Evaluator to connect back to the Driver.")
+public final class DriverRemoteIdentifier implements Name<String> {
+  private DriverRemoteIdentifier() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorIdentifier.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorIdentifier.java
new file mode 100644
index 0000000..d0fa894
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorIdentifier.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The evaluator identifier.
+ */
+@NamedParameter(doc = "The evaluator identifier.")
+public final class EvaluatorIdentifier implements Name<String> {
+  private EvaluatorIdentifier() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/HeartbeatPeriod.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/HeartbeatPeriod.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/HeartbeatPeriod.java
new file mode 100644
index 0000000..20804c6
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/HeartbeatPeriod.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The evaluator heartbeat period in ms.
+ */
+@NamedParameter(doc = "The evaluator heartbeat period in ms.", default_value = "5000")
+public final class HeartbeatPeriod implements Name<Integer> {
+  private HeartbeatPeriod() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/InitialTaskConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/InitialTaskConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/InitialTaskConfiguration.java
new file mode 100644
index 0000000..b9d80cd
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/InitialTaskConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * An initial task to launch on startup.
+ */
+@NamedParameter(doc = "An initial task to launch on startup.")
+public final class InitialTaskConfiguration implements Name<String> {
+  private InitialTaskConfiguration() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootContextConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootContextConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootContextConfiguration.java
new file mode 100644
index 0000000..d4bc566
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootContextConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The evaluator root context configuration.
+ */
+@NamedParameter(doc = "The evaluator root context configuration.")
+public final class RootContextConfiguration implements Name<String> {
+  private RootContextConfiguration() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootServiceConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootServiceConfiguration.java
new file mode 100644
index 0000000..aabc326
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootServiceConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The service configuration for the root context
+ */
+@NamedParameter(doc = "The service configuration for the root context")
+public final class RootServiceConfiguration implements Name<String> {
+  private RootServiceConfiguration() {
+  }
+}