You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:11 UTC

[38/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/package-info.java
new file mode 100644
index 0000000..cc60934
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Default implementations for the optional driver-side event handlers.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
new file mode 100644
index 0000000..462d6c9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.EvaluatorConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver-Side representation of an allocated evaluator.
+ */
+@DriverSide
+@Private
+final class AllocatedEvaluatorImpl implements AllocatedEvaluator {
+
+  private final static Logger LOG = Logger.getLogger(AllocatedEvaluatorImpl.class.getName());
+
+  private final EvaluatorManager evaluatorManager;
+  private final String remoteID;
+  private final ConfigurationSerializer configurationSerializer;
+  private final String jobIdentifier;
+  private final LoggingScopeFactory loggingScopeFactory;
+
+  /**
+   * The set of files to be places on the Evaluator.
+   */
+  private final Collection<File> files = new HashSet<>();
+  /**
+   * The set of libraries
+   */
+  private final Collection<File> libraries = new HashSet<>();
+
+  AllocatedEvaluatorImpl(final EvaluatorManager evaluatorManager,
+                         final String remoteID,
+                         final ConfigurationSerializer configurationSerializer,
+                         final String jobIdentifier,
+                         final LoggingScopeFactory loggingScopeFactory) {
+    this.evaluatorManager = evaluatorManager;
+    this.remoteID = remoteID;
+    this.configurationSerializer = configurationSerializer;
+    this.jobIdentifier = jobIdentifier;
+    this.loggingScopeFactory = loggingScopeFactory;
+  }
+
+  @Override
+  public String getId() {
+    return this.evaluatorManager.getId();
+  }
+
+  @Override
+  public void close() {
+    this.evaluatorManager.close();
+  }
+
+  @Override
+  public void submitTask(final Configuration taskConfiguration) {
+    final Configuration contextConfiguration = ContextConfiguration.CONF
+        .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId())
+        .build();
+    this.submitContextAndTask(contextConfiguration, taskConfiguration);
+
+  }
+
+  @Override
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.evaluatorManager.getEvaluatorDescriptor();
+  }
+
+
+  @Override
+  public void submitContext(final Configuration contextConfiguration) {
+    launch(contextConfiguration, Optional.<Configuration>empty(), Optional.<Configuration>empty());
+  }
+
+  @Override
+  public void submitContextAndService(final Configuration contextConfiguration,
+                                      final Configuration serviceConfiguration) {
+    launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.<Configuration>empty());
+  }
+
+  @Override
+  public void submitContextAndTask(final Configuration contextConfiguration,
+                                   final Configuration taskConfiguration) {
+    launch(contextConfiguration, Optional.<Configuration>empty(), Optional.of(taskConfiguration));
+  }
+
+  @Override
+  public void submitContextAndServiceAndTask(final Configuration contextConfiguration,
+                                             final Configuration serviceConfiguration,
+                                             final Configuration taskConfiguration) {
+    launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration));
+  }
+
+  @Override
+  public void setType(final EvaluatorType type) {
+    this.evaluatorManager.setType(type);
+  }
+
+  @Override
+  public void addFile(final File file) {
+    this.files.add(file);
+  }
+
+  @Override
+  public void addLibrary(final File file) {
+    this.libraries.add(file);
+  }
+
+  private final void launch(final Configuration contextConfiguration,
+                            final Optional<Configuration> serviceConfiguration,
+                            final Optional<Configuration> taskConfiguration) {
+    try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) {
+      try {
+        final ConfigurationModule evaluatorConfigurationModule = EvaluatorConfiguration.CONF
+            .set(EvaluatorConfiguration.APPLICATION_IDENTIFIER, this.jobIdentifier)
+            .set(EvaluatorConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteID)
+            .set(EvaluatorConfiguration.EVALUATOR_IDENTIFIER, this.getId());
+
+        final String encodedContextConfigurationString = this.configurationSerializer.toString(contextConfiguration);
+        // Add the (optional) service configuration
+        final ConfigurationModule contextConfigurationModule;
+        if (serviceConfiguration.isPresent()) {
+          // With service configuration
+          final String encodedServiceConfigurationString = this.configurationSerializer.toString(serviceConfiguration.get());
+          contextConfigurationModule = evaluatorConfigurationModule
+              .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, encodedServiceConfigurationString)
+              .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, encodedContextConfigurationString);
+        } else {
+          // No service configuration
+          contextConfigurationModule = evaluatorConfigurationModule
+              .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, encodedContextConfigurationString);
+        }
+
+        // Add the (optional) task configuration
+        final Configuration evaluatorConfiguration;
+        if (taskConfiguration.isPresent()) {
+          final String encodedTaskConfigurationString = this.configurationSerializer.toString(taskConfiguration.get());
+          evaluatorConfiguration = contextConfigurationModule
+              .set(EvaluatorConfiguration.TASK_CONFIGURATION, encodedTaskConfigurationString).build();
+        } else {
+          evaluatorConfiguration = contextConfigurationModule.build();
+        }
+
+        final DriverRuntimeProtocol.ResourceLaunchProto.Builder rbuilder =
+            DriverRuntimeProtocol.ResourceLaunchProto.newBuilder()
+                .setIdentifier(this.evaluatorManager.getId())
+                .setRemoteId(this.remoteID)
+                .setEvaluatorConf(configurationSerializer.toString(evaluatorConfiguration));
+
+        for (final File file : this.files) {
+          rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder()
+              .setName(file.getName())
+              .setPath(file.getPath())
+              .setType(ReefServiceProtos.FileType.PLAIN)
+              .build());
+        }
+
+        for (final File lib : this.libraries) {
+          rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder()
+              .setName(lib.getName())
+              .setPath(lib.getPath().toString())
+              .setType(ReefServiceProtos.FileType.LIB)
+              .build());
+        }
+
+        { // Set the type
+          switch (this.evaluatorManager.getEvaluatorDescriptor().getType()) {
+            case CLR:
+              rbuilder.setType(ReefServiceProtos.ProcessType.CLR);
+              break;
+            default:
+              rbuilder.setType(ReefServiceProtos.ProcessType.JVM);
+          }
+        }
+
+        this.evaluatorManager.onResourceLaunch(rbuilder.build());
+
+      } catch (final BindException ex) {
+        LOG.log(Level.SEVERE, "Bad Evaluator configuration", ex);
+        throw new RuntimeException("Bad Evaluator configuration", ex);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "AllocatedEvaluator{ID='" + getId() + "\'}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/CompletedEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/CompletedEvaluatorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/CompletedEvaluatorImpl.java
new file mode 100644
index 0000000..78e4446
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/CompletedEvaluatorImpl.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+
+/**
+ * Implementation of CompletedEvaluator.
+ */
+@DriverSide
+@Private
+final class CompletedEvaluatorImpl implements CompletedEvaluator {
+
+  private final String id;
+
+  CompletedEvaluatorImpl(final String id) {
+    this.id = id;
+  }
+
+  @Override
+  public String getId() {
+    return this.id;
+  }
+
+  @Override
+  public String toString() {
+    return "CompletedEvaluator{" +
+        "id='" + id + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
new file mode 100644
index 0000000..ce62027
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This class handles the sending of Evaluator control messages to the Evaluator.
+ */
+@DriverSide
+@Private
+public final class EvaluatorControlHandler {
+
+  private static Logger LOG = Logger.getLogger(EvaluatorControlHandler.class.getName());
+  private final EvaluatorStatusManager stateManager;
+  private final RemoteManager remoteManager;
+  private final String evaluatorId;
+  private Optional<EventHandler<EvaluatorRuntimeProtocol.EvaluatorControlProto>> wrapped = Optional.empty();
+
+  /**
+   * @param stateManager  used to check whether the Evaluator is running before sending a message.
+   * @param remoteManager used to establish the communications link as soon as the remote ID has been set.
+   */
+  @Inject
+  EvaluatorControlHandler(final EvaluatorStatusManager stateManager,
+                          final RemoteManager remoteManager,
+                          final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorId) {
+    this.stateManager = stateManager;
+    this.remoteManager = remoteManager;
+    this.evaluatorId = evaluatorId;
+    LOG.log(Level.FINE, "Instantiated 'EvaluatorControlHandler'");
+  }
+
+  /**
+   * Send the evaluatorControlProto to the Evaluator.
+   *
+   * @param evaluatorControlProto
+   * @throws java.lang.IllegalStateException if the remote ID hasn't been set via setRemoteID() prior to this call
+   * @throws java.lang.IllegalStateException if the Evaluator isn't running.
+   */
+  public synchronized void send(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
+    if (!this.wrapped.isPresent()) {
+      throw new IllegalStateException("Trying to send an EvaluatorControlProto before the Evaluator ID is set.");
+    }
+    if (!this.stateManager.isRunning()) {
+      final String msg = new StringBuilder()
+          .append("Trying to send an EvaluatorControlProto to Evaluator [")
+          .append(this.evaluatorId)
+          .append("] that is in state [")
+          .append(this.stateManager.toString())
+          .append("], not [RUNNING]. The control message was: ")
+          .append(evaluatorControlProto.toString())
+          .toString();
+      throw new IllegalStateException(msg);
+    }
+    this.wrapped.get().onNext(evaluatorControlProto);
+  }
+
+  /**
+   * Set the remote ID used to communicate with this Evaluator.
+   *
+   * @param evaluatorRID
+   * @throws java.lang.IllegalStateException if the remote ID has been set before.
+   */
+  synchronized void setRemoteID(final String evaluatorRID) {
+    if (this.wrapped.isPresent()) {
+      throw new IllegalStateException("Trying to reset the evaluator ID. This isn't supported.");
+    } else {
+      LOG.log(Level.FINE, "Registering remoteId [{0}] for Evaluator [{1}]", new Object[]{evaluatorRID, evaluatorId});
+      this.wrapped = Optional.of(remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
new file mode 100644
index 0000000..1a0bbcc
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+
+/**
+ * A simple all-data implementation of EvaluatorDescriptor
+ */
+@Private
+@DriverSide
+final class EvaluatorDescriptorImpl implements EvaluatorDescriptor {
+
+  private final NodeDescriptor nodeDescriptor;
+  private final int megaBytes;
+  private final int numberOfCores;
+  private EvaluatorType type;
+
+  public EvaluatorDescriptorImpl(final NodeDescriptor nodeDescriptor,
+                                 final EvaluatorType type,
+                                 final int megaBytes,
+                                 final int numberOfCores) {
+    this.nodeDescriptor = nodeDescriptor;
+    this.type = type;
+    this.megaBytes = megaBytes;
+    this.numberOfCores = numberOfCores;
+  }
+
+  @Override
+  public NodeDescriptor getNodeDescriptor() {
+    return this.nodeDescriptor;
+  }
+
+  @Override
+  public synchronized EvaluatorType getType() {
+    return this.type;
+  }
+
+  public synchronized void setType(final EvaluatorType type) {
+    if (this.getType() != EvaluatorType.UNDECIDED) {
+      throw new RuntimeException("Unable to change state of an Evaluator of Type: " + this.getType());
+    }
+    this.type = type;
+  }
+
+  @Override
+  public int getMemory() {
+    return this.megaBytes;
+  }
+
+  @Override
+  public int getNumberOfCores() {
+    return this.numberOfCores;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartBeatSanityChecker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartBeatSanityChecker.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartBeatSanityChecker.java
new file mode 100644
index 0000000..09d8088
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartBeatSanityChecker.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Sanity checks for evaluator heartbeats.
+ */
+@DriverSide
+@Private
+final class EvaluatorHeartBeatSanityChecker {
+  private static final Logger LOG = Logger.getLogger(EvaluatorHeartBeatSanityChecker.class.getName());
+  private final Map<String, Long> knownTimeStamps = new HashMap<>(); // guarded by this
+
+  @Inject
+  EvaluatorHeartBeatSanityChecker() {
+  }
+
+  final synchronized void check(final String id, final long timeStamp) {
+    if (knownTimeStamps.containsKey(id)) {
+      final long oldTimeStamp = this.knownTimeStamps.get(id);
+      LOG.log(Level.FINEST, "TIMESTAMP CHECKER: id [ " + id + " ], old timestamp [ " + oldTimeStamp + " ], new timestamp [ " + timeStamp + " ]");
+      if (oldTimeStamp > timeStamp) {
+        final String msg = "Received an old heartbeat with timestamp `" + timeStamp +
+            "` while earlier receiving one with timestamp `" + oldTimeStamp + "`";
+        LOG.log(Level.SEVERE, msg);
+        throw new RuntimeException(msg);
+      }
+    }
+    knownTimeStamps.put(id, timeStamp);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
new file mode 100644
index 0000000..c90098f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Receives heartbeats from all Evaluators and dispatches them to the right EvaluatorManager instance.
+ */
+@Private
+@DriverSide
+public final class EvaluatorHeartbeatHandler implements EventHandler<RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto>> {
+  private static final Logger LOG = Logger.getLogger(EvaluatorHeartbeatHandler.class.getName());
+  private final Evaluators evaluators;
+  private final EvaluatorManagerFactory evaluatorManagerFactory;
+
+  @Inject
+  EvaluatorHeartbeatHandler(final Evaluators evaluators, final EvaluatorManagerFactory evaluatorManagerFactory) {
+    this.evaluators = evaluators;
+    this.evaluatorManagerFactory = evaluatorManagerFactory;
+  }
+
+  @Override
+  public void onNext(final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) {
+    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeat = evaluatorHeartbeatMessage.getMessage();
+    final ReefServiceProtos.EvaluatorStatusProto status = heartbeat.getEvaluatorStatus();
+    final String evaluatorId = status.getEvaluatorId();
+
+    LOG.log(Level.FINEST, "TIME: Begin Heartbeat {0}", evaluatorId);
+    LOG.log(Level.FINEST, "Heartbeat from Evaluator {0} with state {1} timestamp {2} from remoteId {3}",
+        new Object[]{evaluatorId, status.getState(), heartbeat.getTimestamp(), evaluatorHeartbeatMessage.getIdentifier()});
+
+    final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId);
+    if (evaluatorManager.isPresent()) {
+      evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
+    } else {
+      final StringBuilder message = new StringBuilder("Contact from unknown Evaluator with identifier '");
+      message.append(evaluatorId);
+      if (heartbeat.hasEvaluatorStatus()) {
+        message.append("' with state '");
+        message.append(status.getState());
+      }
+      message.append('\'');
+      throw new RuntimeException(message.toString());
+    }
+    LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
new file mode 100644
index 0000000..3938e06
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -0,0 +1,529 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
+import org.apache.reef.io.naming.Identifiable;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
+import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
+import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages a single Evaluator instance including all lifecycle instances:
+ * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
+ * <p/>
+ * A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
+ * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this
+ * heartbeat channel.
+ * <p/>
+ * A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
+ * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
+ * control information (e.g., shutdown, suspend).
+ */
+@Private
+@DriverSide
+public final class EvaluatorManager implements Identifiable, AutoCloseable {
+
+  private final static Logger LOG = Logger.getLogger(EvaluatorManager.class.getName());
+
+  private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker();
+  private final Clock clock;
+  private final ResourceReleaseHandler resourceReleaseHandler;
+  private final ResourceLaunchHandler resourceLaunchHandler;
+  private final String evaluatorId;
+  private final EvaluatorDescriptorImpl evaluatorDescriptor;
+  private final ContextRepresenters contextRepresenters;
+  private final EvaluatorMessageDispatcher messageDispatcher;
+  private final EvaluatorControlHandler evaluatorControlHandler;
+  private final ContextControlHandler contextControlHandler;
+  private final EvaluatorStatusManager stateManager;
+  private final ExceptionCodec exceptionCodec;
+  private final DriverStatusManager driverStatusManager;
+  private final EventHandlerIdlenessSource idlenessSource;
+  private final LoggingScopeFactory loggingScopeFactory;
+
+
+  // Mutable fields
+  private Optional<TaskRepresenter> task = Optional.empty();
+  private boolean isResourceReleased = false;
+
+  @Inject
+  private EvaluatorManager(
+      final Clock clock,
+      final RemoteManager remoteManager,
+      final ResourceReleaseHandler resourceReleaseHandler,
+      final ResourceLaunchHandler resourceLaunchHandler,
+      final @Parameter(EvaluatorIdentifier.class) String evaluatorId,
+      final @Parameter(EvaluatorDescriptorName.class) EvaluatorDescriptorImpl evaluatorDescriptor,
+      final ContextRepresenters contextRepresenters,
+      final ConfigurationSerializer configurationSerializer,
+      final EvaluatorMessageDispatcher messageDispatcher,
+      final EvaluatorControlHandler evaluatorControlHandler,
+      final ContextControlHandler contextControlHandler,
+      final EvaluatorStatusManager stateManager,
+      final DriverStatusManager driverStatusManager,
+      final ExceptionCodec exceptionCodec,
+      final EventHandlerIdlenessSource idlenessSource,
+      final LoggingScopeFactory loggingScopeFactory) {
+    this.contextRepresenters = contextRepresenters;
+    this.idlenessSource = idlenessSource;
+    LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
+    this.clock = clock;
+    this.resourceReleaseHandler = resourceReleaseHandler;
+    this.resourceLaunchHandler = resourceLaunchHandler;
+    this.evaluatorId = evaluatorId;
+    this.evaluatorDescriptor = evaluatorDescriptor;
+
+    this.messageDispatcher = messageDispatcher;
+    this.evaluatorControlHandler = evaluatorControlHandler;
+    this.contextControlHandler = contextControlHandler;
+    this.stateManager = stateManager;
+    this.driverStatusManager = driverStatusManager;
+    this.exceptionCodec = exceptionCodec;
+    this.loggingScopeFactory = loggingScopeFactory;
+
+    final AllocatedEvaluator allocatedEvaluator =
+        new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier(), configurationSerializer, getJobIdentifier(), loggingScopeFactory);
+    LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId);
+    this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
+    LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
+  }
+
+  /**
+   * Get the id of current job/application
+   */
+  public static String getJobIdentifier() {
+    // TODO: currently we obtain the job id directly by parsing execution (container) directory path
+    // #845 is open to get the id from RM properly
+    for (File directory = new File(System.getProperty("user.dir"));
+         directory != null; directory = directory.getParentFile()) {
+      final String currentDirectoryName = directory.getName();
+      if (currentDirectoryName.toLowerCase().contains("application_")) {
+        return currentDirectoryName;
+      }
+    }
+    // cannot find a directory that contains application_, presumably we are on local runtime
+    // again, this is a hack for now, we need #845 as a proper solution
+    return "REEF_LOCAL_RUNTIME";
+  }
+
+  private static boolean isDoneOrFailedOrKilled(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+    return resourceStatusProto.getState() == ReefServiceProtos.State.DONE ||
+        resourceStatusProto.getState() == ReefServiceProtos.State.FAILED ||
+        resourceStatusProto.getState() == ReefServiceProtos.State.KILLED;
+  }
+
+  @Override
+  public String getId() {
+    return this.evaluatorId;
+  }
+
+  public void setType(final EvaluatorType type) {
+    this.evaluatorDescriptor.setType(type);
+  }
+
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.evaluatorDescriptor;
+  }
+
+  @Override
+  public void close() {
+    synchronized (this.evaluatorDescriptor) {
+      if (this.stateManager.isRunning()) {
+        LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId());
+        try {
+          // Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
+          final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto =
+              EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
+                  .setTimestamp(System.currentTimeMillis())
+                  .setIdentifier(getId())
+                  .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
+                  .build();
+          sendEvaluatorControlMessage(evaluatorControlProto);
+        } finally {
+          this.stateManager.setKilled();
+        }
+      }
+
+
+      if (!this.isResourceReleased) {
+        this.isResourceReleased = true;
+        try {
+        /* We need to wait awhile before returning the container to the RM in order to
+         * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */
+          this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
+            @Override
+            public void onNext(final Alarm alarm) {
+              EvaluatorManager.this.resourceReleaseHandler.onNext(
+                  DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
+                      .setIdentifier(EvaluatorManager.this.evaluatorId).build()
+              );
+            }
+          });
+        } catch (final IllegalStateException e) {
+          LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
+          EvaluatorManager.this.resourceReleaseHandler.onNext(
+              DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
+                  .setIdentifier(EvaluatorManager.this.evaluatorId).build()
+          );
+        }
+      }
+    }
+    this.idlenessSource.check();
+  }
+
+  /**
+   * Return true if the state is DONE, FAILED, or KILLED,
+   * <em>and</em> there are no messages queued or in processing.
+   */
+  public boolean isClosed() {
+    return this.messageDispatcher.isEmpty() &&
+        (this.stateManager.isDoneOrFailedOrKilled());
+  }
+
+  /**
+   * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED
+   *
+   * @param exception on the EvaluatorRuntime
+   */
+  public void onEvaluatorException(final EvaluatorException exception) {
+    synchronized (this.evaluatorDescriptor) {
+      if (this.stateManager.isDoneOrFailedOrKilled()) {
+        LOG.log(Level.FINE, "Ignoring an exception receivedfor Evaluator {0} which is already in state {1}.",
+            new Object[]{this.getId(), this.stateManager});
+        return;
+      }
+
+      LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception);
+
+      try {
+
+        final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure();
+
+        final Optional<FailedTask> failedTaskOptional;
+        if (this.task.isPresent()) {
+          final String taskId = this.task.get().getId();
+          final Optional<ActiveContext> evaluatorContext = Optional.empty();
+          final Optional<byte[]> bytes = Optional.empty();
+          final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash"));
+          final String message = "Evaluator crash";
+          final Optional<String> description = Optional.empty();
+          final FailedTask failedTask = new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext);
+          failedTaskOptional = Optional.of(failedTask);
+        } else {
+          failedTaskOptional = Optional.empty();
+        }
+
+
+        this.messageDispatcher.onEvaluatorFailed(new FailedEvaluatorImpl(exception, failedContextList, failedTaskOptional, this.evaluatorId));
+
+      } catch (final Exception e) {
+        LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
+      } finally {
+        this.stateManager.setFailed();
+        close();
+      }
+    }
+  }
+
+  public synchronized void onEvaluatorHeartbeatMessage(
+      final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) {
+
+    final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto =
+        evaluatorHeartbeatProtoRemoteMessage.getMessage();
+    LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
+
+    if (this.stateManager.isDoneOrFailedOrKilled()) {
+      LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.",
+          new Object[]{this.getId(), this.stateManager});
+      return;
+    }
+
+    this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp());
+    final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
+
+    // first message from a running evaluator trying to re-establish communications
+    if (evaluatorHeartbeatProto.getRecovery()) {
+      this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+      this.stateManager.setRunning();
+
+      this.driverStatusManager.oneContainerRecovered();
+      final int numRecoveredContainers = this.driverStatusManager.getNumRecoveredContainers();
+
+      LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId);
+      final int expectedEvaluatorsNumber = this.driverStatusManager.getNumPreviousContainers();
+
+      if (numRecoveredContainers > expectedEvaluatorsNumber) {
+        LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, but [{1}] evaluators have checked in.",
+            new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
+        throw new RuntimeException("More then expected number of evaluators are checking in during recovery.");
+      } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
+        LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. Recovery completed.", expectedEvaluatorsNumber);
+        this.driverStatusManager.setRestartCompleted();
+        this.messageDispatcher.OnDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis()));
+      } else {
+        LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] evaluators have checked in.",
+            new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
+      }
+    }
+
+    // If this is the first message from this Evaluator, register it.
+    if (this.stateManager.isSubmitted()) {
+      this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+      this.stateManager.setRunning();
+      LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
+    }
+
+    // Process the Evaluator status message
+    if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
+      this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus());
+    }
+
+    // Process the Context status message(s)
+    final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus();
+    this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(),
+        informClientOfNewContexts);
+
+    // Process the Task status message
+    if (evaluatorHeartbeatProto.hasTaskStatus()) {
+      this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus());
+    }
+    LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId());
+  }
+
+  /**
+   * Process a evaluator status message.
+   *
+   * @param message
+   */
+  private synchronized void onEvaluatorStatusMessage(final ReefServiceProtos.EvaluatorStatusProto message) {
+
+    switch (message.getState()) {
+      case DONE:
+        this.onEvaluatorDone(message);
+        break;
+      case FAILED:
+        this.onEvaluatorFailed(message);
+        break;
+      case INIT:
+      case KILLED:
+      case RUNNING:
+      case SUSPEND:
+        break;
+    }
+  }
+
+  /**
+   * Process an evaluator message that indicates that the evaluator shut down cleanly.
+   *
+   * @param message
+   */
+  private synchronized void onEvaluatorDone(final ReefServiceProtos.EvaluatorStatusProto message) {
+    assert (message.getState() == ReefServiceProtos.State.DONE);
+    LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
+    this.stateManager.setDone();
+    this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId));
+    close();
+  }
+
+  /**
+   * Process an evaluator message that indicates a crash.
+   *
+   * @param evaluatorStatusProto
+   */
+  private synchronized void onEvaluatorFailed(final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) {
+    assert (evaluatorStatusProto.getState() == ReefServiceProtos.State.FAILED);
+    final EvaluatorException evaluatorException;
+    if (evaluatorStatusProto.hasError()) {
+      final Optional<Throwable> exception = this.exceptionCodec.fromBytes(evaluatorStatusProto.getError().toByteArray());
+      if (exception.isPresent()) {
+        evaluatorException = new EvaluatorException(getId(), exception.get());
+      } else {
+        evaluatorException = new EvaluatorException(getId(), new Exception("Exception sent, but can't be deserialized"));
+      }
+    } else {
+      evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent"));
+    }
+    onEvaluatorException(evaluatorException);
+  }
+
+  public void onResourceLaunch(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) {
+    synchronized (this.evaluatorDescriptor) {
+      if (this.stateManager.isAllocated()) {
+        this.stateManager.setSubmitted();
+        this.resourceLaunchHandler.onNext(resourceLaunchProto);
+      } else {
+        throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED +
+            " state but instead is in state " + this.stateManager);
+      }
+    }
+  }
+
+  /**
+   * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime
+   *
+   * @param contextControlProto message contains context control info.
+   */
+  public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
+    synchronized (this.evaluatorDescriptor) {
+      LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId);
+      this.contextControlHandler.send(contextControlProto);
+    }
+  }
+
+  /**
+   * Forward the EvaluatorControlProto to the EvaluatorRuntime
+   *
+   * @param evaluatorControlProto message contains evaluator control information.
+   */
+  void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
+    synchronized (this.evaluatorDescriptor) {
+      this.evaluatorControlHandler.send(evaluatorControlProto);
+    }
+  }
+
+  /**
+   * Handle task status messages.
+   *
+   * @param taskStatusProto message contains the current task status.
+   */
+  private void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+
+    if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatusProto.getTaskId()))) {
+      if (taskStatusProto.getState() == ReefServiceProtos.State.INIT ||
+          taskStatusProto.getState() == ReefServiceProtos.State.FAILED ||
+          taskStatusProto.getRecovery() // for task from recovered evaluators
+          ) {
+
+        // FAILED is a legal first state of a Task as it could have failed during construction.
+        this.task = Optional.of(
+            new TaskRepresenter(taskStatusProto.getTaskId(),
+                this.contextRepresenters.getContext(taskStatusProto.getContextId()),
+                this.messageDispatcher,
+                this,
+                this.exceptionCodec));
+      } else {
+        throw new RuntimeException("Received an message of state " + taskStatusProto.getState() +
+            ", not INIT or FAILED for Task " + taskStatusProto.getTaskId() + " which we haven't heard from before.");
+      }
+    }
+    this.task.get().onTaskStatusMessage(taskStatusProto);
+
+    if (this.task.get().isNotRunning()) {
+      LOG.log(Level.FINEST, "Task no longer running. De-registering it.");
+      this.task = Optional.empty();
+    }
+  }
+
+  /**
+   * Resource status information from the (actual) resource manager.
+   */
+  public void onResourceStatusMessage(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+    synchronized (this.evaluatorDescriptor) {
+      LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusProto.getState());
+      if (this.stateManager.isDoneOrFailedOrKilled()) {
+        LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.",
+            new Object[]{this.getId(), this.stateManager});
+      } else if (isDoneOrFailedOrKilled(resourceStatusProto) && this.stateManager.isAllocatedOrSubmittedOrRunning()) {
+        // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes
+        // it to be alive.
+        final StringBuilder messageBuilder = new StringBuilder("Evaluator [")
+            .append(this.evaluatorId)
+            .append("] is assumed to be in state [")
+            .append(this.stateManager.toString())
+            .append("]. But the resource manager reports it to be in state [")
+            .append(resourceStatusProto.getState())
+            .append("].");
+
+        if (this.stateManager.isSubmitted()) {
+          messageBuilder
+              .append(" This most likely means that the Evaluator suffered a failure before establishing a communications link to the driver.");
+        } else if (this.stateManager.isAllocated()) {
+          messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used.");
+        } else if (this.stateManager.isRunning()) {
+          messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message back to the driver.");
+        }
+        if (this.task.isPresent()) {
+          messageBuilder.append(" Task [")
+              .append(this.task.get().getId())
+              .append("] was running when the Evaluator crashed.");
+        }
+        this.isResourceReleased = true;
+
+        if (resourceStatusProto.getState() == ReefServiceProtos.State.KILLED) {
+          this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString()));
+        } else {
+          this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString()));
+        }
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "EvaluatorManager:"
+        + " id=" + this.evaluatorId
+        + " state=" + this.stateManager
+        + " task=" + this.task;
+  }
+
+  // Dynamic Parameters
+  @NamedParameter(doc = "The Evaluator Identifier.")
+  public final static class EvaluatorIdentifier implements Name<String> {
+  }
+
+  @NamedParameter(doc = "The Evaluator Host.")
+  public final static class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
new file mode 100644
index 0000000..d6e7757
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorHandler;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper class that creates new EvaluatorManager instances from alloations.
+ */
+@Private
+@DriverSide
+public final class EvaluatorManagerFactory {
+  private static final Logger LOG = Logger.getLogger(EvaluatorManagerFactory.class.getName());
+
+  private final Injector injector;
+  private final ResourceCatalog resourceCatalog;
+
+  @Inject
+  EvaluatorManagerFactory(final Injector injector, final ResourceCatalog resourceCatalog, final NodeDescriptorHandler nodeDescriptorHandler) {
+    this.injector = injector;
+    this.resourceCatalog = resourceCatalog;
+  }
+
+  /**
+   * Helper method to create a new EvaluatorManager instance
+   *
+   * @param id   identifier of the Evaluator
+   * @param desc NodeDescriptor on which the Evaluator executes.
+   * @return a new EvaluatorManager instance.
+   */
+  private final EvaluatorManager getNewEvaluatorManagerInstance(final String id, final EvaluatorDescriptorImpl desc) {
+    LOG.log(Level.FINEST, "Creating Evaluator Manager for Evaluator ID {0}", id);
+    final Injector child = this.injector.forkInjector();
+
+    try {
+      child.bindVolatileParameter(EvaluatorManager.EvaluatorIdentifier.class, id);
+      child.bindVolatileParameter(EvaluatorManager.EvaluatorDescriptorName.class, desc);
+    } catch (final BindException e) {
+      throw new RuntimeException("Unable to bind evaluator identifier and name.", e);
+    }
+
+    final EvaluatorManager result;
+    try {
+      result = child.getInstance(EvaluatorManager.class);
+    } catch (final InjectionException e) {
+      throw new RuntimeException("Unable to instantiate a new EvaluatorManager for Evaluator ID: " + id, e);
+    }
+    return result;
+  }
+
+  /**
+   * Instantiates a new EvaluatorManager based on a resource allocation.
+   *
+   * @param resourceAllocationProto
+   * @return
+   */
+  public final EvaluatorManager getNewEvaluatorManager(final DriverRuntimeProtocol.ResourceAllocationProto resourceAllocationProto) {
+    final NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationProto.getNodeId());
+
+    if (nodeDescriptor == null) {
+      throw new RuntimeException("Unknown resource: " + resourceAllocationProto.getNodeId());
+    }
+    final EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor,
+        EvaluatorType.UNDECIDED, resourceAllocationProto.getResourceMemory(), resourceAllocationProto.getVirtualCores());
+
+    LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", resourceAllocationProto.getIdentifier());
+    return this.getNewEvaluatorManagerInstance(resourceAllocationProto.getIdentifier(), evaluatorDescriptor);
+  }
+
+  public final EvaluatorManager createForEvaluatorFailedDuringDriverRestart(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+    if (!resourceStatusProto.getIsFromPreviousDriver()) {
+      throw new RuntimeException("Invalid resourceStatusProto, must be status for resource from previous Driver.");
+    }
+    return getNewEvaluatorManagerInstance(resourceStatusProto.getIdentifier(), new EvaluatorDescriptorImpl(null, EvaluatorType.UNDECIDED, 128, 1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
new file mode 100644
index 0000000..709cd30
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.runtime.common.driver.DriverExceptionHandler;
+import org.apache.reef.runtime.common.utils.DispatchingEStage;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Central dispatcher for all Evaluator related events. This exists once per Evaluator.
+ */
+public final class EvaluatorMessageDispatcher {
+
+  private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName());
+
+  /**
+   * Dispatcher used for application provided event handlers.
+   */
+  private final DispatchingEStage applicationDispatcher;
+
+  /**
+   * Dispatcher used for service provided event handlers.
+   */
+  private final DispatchingEStage serviceDispatcher;
+
+
+  /**
+   * Dispatcher used for application provided driver-restart specific event handlers.
+   */
+  private final DispatchingEStage driverRestartApplicationDispatcher;
+
+  /**
+   * Dispatcher used for service provided driver-restart specific event handlers.
+   */
+  private final DispatchingEStage driverRestartServiceDispatcher;
+
+  @Inject
+  EvaluatorMessageDispatcher(
+      // Application-provided Context event handlers
+      final @Parameter(ContextActiveHandlers.class) Set<EventHandler<ActiveContext>> contextActiveHandlers,
+      final @Parameter(ContextClosedHandlers.class) Set<EventHandler<ClosedContext>> contextClosedHandlers,
+      final @Parameter(ContextFailedHandlers.class) Set<EventHandler<FailedContext>> contextFailedHandlers,
+      final @Parameter(ContextMessageHandlers.class) Set<EventHandler<ContextMessage>> contextMessageHandlers,
+      // Service-provided Context event handlers
+      final @Parameter(ServiceContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceContextActiveHandlers,
+      final @Parameter(ServiceContextClosedHandlers.class) Set<EventHandler<ClosedContext>> serviceContextClosedHandlers,
+      final @Parameter(ServiceContextFailedHandlers.class) Set<EventHandler<FailedContext>> serviceContextFailedHandlers,
+      final @Parameter(ServiceContextMessageHandlers.class) Set<EventHandler<ContextMessage>> serviceContextMessageHandlers,
+      // Application-provided Task event handlers
+      final @Parameter(TaskRunningHandlers.class) Set<EventHandler<RunningTask>> taskRunningHandlers,
+      final @Parameter(TaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+      final @Parameter(TaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+      final @Parameter(TaskMessageHandlers.class) Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
+      final @Parameter(TaskFailedHandlers.class) Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
+      // Service-provided Task event handlers
+      final @Parameter(ServiceTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers,
+      final @Parameter(ServiceTaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers,
+      final @Parameter(ServiceTaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers,
+      final @Parameter(ServiceTaskMessageHandlers.class) Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers,
+      final @Parameter(ServiceTaskFailedHandlers.class) Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers,
+      // Application-provided Evaluator event handlers
+      final @Parameter(EvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
+      final @Parameter(EvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
+      final @Parameter(EvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
+      // Service-provided Evaluator event handlers
+      final @Parameter(ServiceEvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers,
+      final @Parameter(ServiceEvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers,
+      final @Parameter(ServiceEvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers,
+
+      // Application event handlers specific to a Driver restart
+      final @Parameter(DriverRestartTaskRunningHandlers.class) Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
+      final @Parameter(DriverRestartContextActiveHandlers.class) Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers,
+      final @Parameter(DriverRestartCompletedHandlers.class) Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers,
+
+      // Service-provided event handlers specific to a Driver restart
+      final @Parameter(ServiceDriverRestartTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers,
+      final @Parameter(ServiceDriverRestartContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers,
+      final @Parameter(ServiceDriverRestartCompletedHandlers.class) Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers,
+
+      final @Parameter(EvaluatorDispatcherThreads.class) int numberOfThreads,
+      final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorIdentifier,
+      final DriverExceptionHandler driverExceptionHandler) {
+
+    this.serviceDispatcher = new DispatchingEStage(driverExceptionHandler, numberOfThreads, evaluatorIdentifier);
+    this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
+    this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
+    this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher);
+
+    { // Application Context event handlers
+      this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
+      this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
+      this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
+      this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
+    }
+    { // Service Context event handlers
+      this.serviceDispatcher.register(ActiveContext.class, serviceContextActiveHandlers);
+      this.serviceDispatcher.register(ClosedContext.class, serviceContextClosedHandlers);
+      this.serviceDispatcher.register(FailedContext.class, serviceContextFailedHandlers);
+      this.serviceDispatcher.register(ContextMessage.class, serviceContextMessageHandlers);
+    }
+    { // Application Task event handlers.
+      this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
+      this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
+      this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
+      this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
+      this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
+    }
+    { // Service Task event handlers
+      this.serviceDispatcher.register(RunningTask.class, serviceTaskRunningEventHandlers);
+      this.serviceDispatcher.register(CompletedTask.class, serviceTaskCompletedEventHandlers);
+      this.serviceDispatcher.register(SuspendedTask.class, serviceTaskSuspendedEventHandlers);
+      this.serviceDispatcher.register(TaskMessage.class, serviceTaskMessageEventHandlers);
+      this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers);
+    }
+    { // Application Evaluator event handlers
+      this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers);
+      this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers);
+      this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
+    }
+    { // Service Evaluator event handlers
+      this.serviceDispatcher.register(FailedEvaluator.class, serviceEvaluatorFailedHandlers);
+      this.serviceDispatcher.register(CompletedEvaluator.class, serviceEvaluatorCompletedHandlers);
+      this.serviceDispatcher.register(AllocatedEvaluator.class, serviceEvaluatorAllocatedEventHandlers);
+    }
+
+    // Application event handlers specific to a Driver restart
+    {
+      this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers);
+      this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers);
+      this.driverRestartApplicationDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers);
+    }
+
+    // Service event handlers specific to a Driver restart
+    {
+      this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers);
+      this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers);
+      this.driverRestartServiceDispatcher.register(DriverRestartCompleted.class, serviceDriverRestartCompletedHandlers);
+    }
+    LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'");
+  }
+
+  public void onEvaluatorAllocated(final AllocatedEvaluator allocatedEvaluator) {
+    this.dispatch(AllocatedEvaluator.class, allocatedEvaluator);
+  }
+
+  public void onEvaluatorFailed(final FailedEvaluator failedEvaluator) {
+    this.dispatch(FailedEvaluator.class, failedEvaluator);
+  }
+
+  public void onEvaluatorCompleted(final CompletedEvaluator completedEvaluator) {
+    this.dispatch(CompletedEvaluator.class, completedEvaluator);
+  }
+
+  public void onTaskRunning(final RunningTask runningTask) {
+    this.dispatch(RunningTask.class, runningTask);
+  }
+
+  public void onTaskCompleted(final CompletedTask completedTask) {
+    this.dispatch(CompletedTask.class, completedTask);
+  }
+
+  public void onTaskSuspended(final SuspendedTask suspendedTask) {
+    this.dispatch(SuspendedTask.class, suspendedTask);
+  }
+
+  public void onTaskMessage(final TaskMessage taskMessage) {
+    this.dispatch(TaskMessage.class, taskMessage);
+  }
+
+  public void onTaskFailed(final FailedTask failedTask) {
+    this.dispatch(FailedTask.class, failedTask);
+  }
+
+  public void onContextActive(final ActiveContext activeContext) {
+    this.dispatch(ActiveContext.class, activeContext);
+  }
+
+  public void onContextClose(final ClosedContext closedContext) {
+    this.dispatch(ClosedContext.class, closedContext);
+  }
+
+  public void onContextFailed(final FailedContext failedContext) {
+    this.dispatch(FailedContext.class, failedContext);
+  }
+
+  public void onContextMessage(final ContextMessage contextMessage) {
+    this.dispatch(ContextMessage.class, contextMessage);
+  }
+
+  public void onDriverRestartTaskRunning(final RunningTask runningTask) {
+    this.dispatchForRestartedDriver(RunningTask.class, runningTask);
+  }
+
+  public void OnDriverRestartContextActive(final ActiveContext activeContext) {
+    this.dispatchForRestartedDriver(ActiveContext.class, activeContext);
+  }
+
+  public void OnDriverRestartCompleted(final DriverRestartCompleted restartCompleted) {
+    this.dispatchForRestartedDriver(DriverRestartCompleted.class, restartCompleted);
+  }
+
+  boolean isEmpty() {
+    return this.applicationDispatcher.isEmpty();
+  }
+
+  private <T, U extends T> void dispatch(final Class<T> type, final U message) {
+    this.serviceDispatcher.onNext(type, message);
+    this.applicationDispatcher.onNext(type, message);
+  }
+
+  private <T, U extends T> void dispatchForRestartedDriver(final Class<T> type, final U message) {
+    this.driverRestartApplicationDispatcher.onNext(type, message);
+    this.driverRestartServiceDispatcher.onNext(type, message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
new file mode 100644
index 0000000..1b9a4ca
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.FailedRuntime;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The error handler receives all resourcemanager errors from all evaluators in the system.
+ * Its primary function is to dispatch these to the appropriate EvaluatorManager.
+ */
+@Private
+public final class EvaluatorResourceManagerErrorHandler implements EventHandler<RemoteMessage<ReefServiceProtos.RuntimeErrorProto>> {
+  private static final Logger LOG = Logger.getLogger(EvaluatorResourceManagerErrorHandler.class.toString());
+  private final Evaluators evaluators;
+
+
+  @Inject
+  EvaluatorResourceManagerErrorHandler(final Evaluators evaluators) {
+    this.evaluators = evaluators;
+    LOG.log(Level.FINE, "Instantiated 'EvaluatorResourceManagerErrorHandler'");
+  }
+
+  @Override
+  public void onNext(final RemoteMessage<ReefServiceProtos.RuntimeErrorProto> runtimeErrorProtoRemoteMessage) {
+    final ReefServiceProtos.RuntimeErrorProto runtimeErrorProto = runtimeErrorProtoRemoteMessage.getMessage();
+    final FailedRuntime error = new FailedRuntime(runtimeErrorProto);
+    final String evaluatorId = error.getId();
+    LOG.log(Level.WARNING, "Runtime error: " + error);
+
+    final EvaluatorException evaluatorException = error.getReason().isPresent() ?
+        new EvaluatorException(evaluatorId, error.getReason().get()) :
+        new EvaluatorException(evaluatorId, "Runtime error");
+
+    final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId);
+    if (evaluatorManager.isPresent()) {
+      evaluatorManager.get().onEvaluatorException(evaluatorException);
+    } else {
+      LOG.log(Level.WARNING, "Unknown evaluator runtime error: " + error);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
new file mode 100644
index 0000000..e1e778e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Various states that the EvaluatorManager could be in. The EvaluatorManager is
+ * created when a resource has been allocated by the ResourceManager.
+ */
+@DriverSide
+@Private
+enum EvaluatorState {
+  ALLOCATED,  // initial state
+  SUBMITTED,  // client called AllocatedEvaluator.submitTask() and we're waiting for first contact
+  RUNNING,    // first contact received, all communication channels established, Evaluator sent to client.
+  // TODO: Add CLOSING state
+  DONE,       // clean shutdown
+  FAILED,     // some failure occurred.
+  KILLED      // unclean shutdown
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
new file mode 100644
index 0000000..ba8a62f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages Status of a single Evaluator.
+ */
+@DriverSide
+@Private
+final class EvaluatorStatusManager {
+  private static final Logger LOG = Logger.getLogger(EvaluatorStatusManager.class.getName());
+  /**
+   * The state managed.
+   */
+  private EvaluatorState state = EvaluatorState.ALLOCATED;
+
+  @Inject
+  private EvaluatorStatusManager() {
+    LOG.log(Level.FINE, "Instantiated 'EvaluatorStatusManager'");
+  }
+
+  private static boolean isLegal(final EvaluatorState from, final EvaluatorState to) {
+    // TODO
+    return true;
+  }
+
+  synchronized void setRunning() {
+    this.setState(EvaluatorState.RUNNING);
+  }
+
+  synchronized void setSubmitted() {
+    this.setState(EvaluatorState.SUBMITTED);
+  }
+
+  synchronized void setDone() {
+    this.setState(EvaluatorState.DONE);
+  }
+
+  synchronized void setFailed() {
+    this.setState(EvaluatorState.FAILED);
+  }
+
+  synchronized void setKilled() {
+    this.setState(EvaluatorState.KILLED);
+  }
+
+  synchronized boolean isRunning() {
+    return this.state.equals(EvaluatorState.RUNNING);
+  }
+
+  synchronized boolean isDoneOrFailedOrKilled() {
+    return (this.state == EvaluatorState.DONE ||
+        this.state == EvaluatorState.FAILED ||
+        this.state == EvaluatorState.KILLED);
+  }
+
+  synchronized boolean isAllocatedOrSubmittedOrRunning() {
+    return (this.state == EvaluatorState.ALLOCATED ||
+        this.state == EvaluatorState.SUBMITTED ||
+        this.state == EvaluatorState.RUNNING);
+  }
+
+  synchronized boolean isSubmitted() {
+    return EvaluatorState.SUBMITTED == this.state;
+  }
+
+  synchronized boolean isAllocated() {
+    return EvaluatorState.ALLOCATED == this.state;
+  }
+
+  @Override
+  public synchronized String toString() {
+    return this.state.toString();
+  }
+
+  private synchronized void setState(final EvaluatorState state) {
+    if (!isLegal(this.state, state)) {
+      throw new IllegalStateException("Illegal state transition from '" + this.state + "' to '" + state + "'");
+    }
+    this.state = state;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
new file mode 100644
index 0000000..dde4e70
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.util.Optional;
+import org.apache.reef.util.SingletonAsserter;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages all Evaluators.
+ * See EvaluatorManager for the Driver side representation of a single Evaluator.
+ */
+@DriverSide
+@Private
+public final class Evaluators implements AutoCloseable {
+
+  private static final Logger LOG = Logger.getLogger(Evaluators.class.getName());
+
+  /**
+   * A map between evaluatorId and the EvaluatorManager that handles this evaluator.
+   */
+  private final Map<String, EvaluatorManager> evaluators = new HashMap<>();
+
+
+  @Inject
+  Evaluators() {
+    LOG.log(Level.FINE, "Instantiated 'Evaluators'");
+    assert (SingletonAsserter.assertSingleton(Evaluators.class));
+  }
+
+  /**
+   * Closes all EvaluatorManager instances managed.
+   */
+  @Override
+  public void close() {
+    final List<EvaluatorManager> evaluatorsCopy;
+    synchronized (this) {
+      evaluatorsCopy = new ArrayList<>(this.evaluators.values());
+    }
+    for (final EvaluatorManager evaluatorManager : evaluatorsCopy) {
+      LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId());
+      if (!evaluatorManager.isClosed()) {
+        evaluatorManager.close();
+      }
+    }
+  }
+
+  /**
+   * Return true if <em>all</em> evaluators are in closed state
+   * (and their processing queues are empty).
+   */
+  public synchronized boolean allEvaluatorsAreClosed() {
+    synchronized (this.evaluators) {
+      for (final EvaluatorManager eval : this.evaluators.values()) {
+        if (!eval.isClosed()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * @param evaluatorId
+   * @return the EvaluatorManager for the given id, if one exists.
+   */
+  public synchronized Optional<EvaluatorManager> get(final String evaluatorId) {
+    return Optional.ofNullable(this.evaluators.get(evaluatorId));
+  }
+
+  /**
+   * Create new EvaluatorManager and add it to the collection.
+   * <p/>
+   * FIXME: This method is a temporary fix for the race condition
+   * described in issues #828 and #839.
+   *
+   * @param evaluatorManagerFactory Factory that builds new EvaluatorManager objects.
+   * @param evaluatorMsg            Resource allocation message that contains data on the new evaluator.
+   * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
+   */
+  public synchronized void put(
+      final EvaluatorManagerFactory evaluatorManagerFactory,
+      final DriverRuntimeProtocol.ResourceAllocationProto evaluatorMsg) {
+    this.put(evaluatorManagerFactory.getNewEvaluatorManager(evaluatorMsg));
+  }
+
+  /**
+   * Adds an EvaluatorManager.
+   *
+   * @param evaluatorManager
+   * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
+   */
+  public synchronized void put(final EvaluatorManager evaluatorManager) {
+    final String evaluatorId = evaluatorManager.getId();
+    final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager);
+    LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev});
+    if (prev != null) {
+      throw new IllegalArgumentException(
+          "Trying to re-add an Evaluator that is already known: " + evaluatorId);
+    }
+  }
+}