You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:10 UTC

[37/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/FailedEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/FailedEvaluatorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/FailedEvaluatorImpl.java
new file mode 100644
index 0000000..450e8e5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/FailedEvaluatorImpl.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.util.Optional;
+
+import java.util.List;
+
+@DriverSide
+@Private
+final class FailedEvaluatorImpl implements FailedEvaluator {
+
+  final String id;
+  private final EvaluatorException ex;
+  private final List<FailedContext> ctx;
+  private final Optional<FailedTask> task;
+
+  public FailedEvaluatorImpl(final EvaluatorException ex, final List<FailedContext> ctx, final Optional<FailedTask> task, final String id) {
+    this.ex = ex;
+    this.ctx = ctx;
+    this.task = task;
+    this.id = id;
+  }
+
+  @Override
+  public EvaluatorException getEvaluatorException() {
+    return this.ex;
+  }
+
+  @Override
+  public List<FailedContext> getFailedContextList() {
+    return this.ctx;
+  }
+
+  @Override
+  public Optional<FailedTask> getFailedTask() {
+    return this.task;
+  }
+
+  @Override
+  public String getId() {
+    return this.id;
+  }
+
+  @Override
+  public String toString() {
+    return "FailedEvaluator{" +
+        "id='" + id + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/package-info.java
new file mode 100644
index 0000000..7d6deee
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver-Side representations of Evaluators.
+ */
+@DriverSide
+@Private package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/ClockIdlenessSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/ClockIdlenessSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/ClockIdlenessSource.java
new file mode 100644
index 0000000..460f5a4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/ClockIdlenessSource.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.runtime.event.IdleClock;
+
+import javax.inject.Inject;
+
+/**
+ * Informs the DriverIdleManager of clock idleness.
+ */
+public final class ClockIdlenessSource implements DriverIdlenessSource, EventHandler<IdleClock> {
+  private static final String COMPONENT_NAME = "Clock";
+  private static final String IDLE_REASON = "The clock reported idle.";
+  private static final IdleMessage IDLE_MESSAGE = new IdleMessage(COMPONENT_NAME, IDLE_REASON, true);
+  private static final String NOT_IDLE_REASON = "The clock reported not idle.";
+  private static final IdleMessage NOT_IDLE_MESSAGE = new IdleMessage(COMPONENT_NAME, NOT_IDLE_REASON, false);
+
+  private final InjectionFuture<DriverIdleManager> driverIdleManager;
+  private final Clock clock;
+
+  @Inject
+  ClockIdlenessSource(final InjectionFuture<DriverIdleManager> driverIdleManager, final Clock clock) {
+    this.driverIdleManager = driverIdleManager;
+    this.clock = clock;
+  }
+
+  @Override
+  public synchronized IdleMessage getIdleStatus() {
+    if (this.clock.isIdle()) {
+      return IDLE_MESSAGE;
+    } else {
+      return NOT_IDLE_MESSAGE;
+    }
+  }
+
+  @Override
+  public synchronized void onNext(final IdleClock idleClock) {
+    this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
new file mode 100644
index 0000000..79d4d8c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+import org.apache.reef.driver.parameters.DriverIdleSources;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handles the various sources for driver idleness and forwards decisions to DriverStatusManager.
+ */
+public final class DriverIdleManager {
+  private static final Logger LOG = Logger.getLogger(DriverIdleManager.class.getName());
+  private static final Level IDLE_REASONS_LEVEL = Level.FINEST;
+  private final Set<DriverIdlenessSource> idlenessSources;
+  private final InjectionFuture<DriverStatusManager> driverStatusManager;
+
+  @Inject
+  DriverIdleManager(final @Parameter(DriverIdleSources.class) Set<DriverIdlenessSource> idlenessSources,
+                    final InjectionFuture<DriverStatusManager> driverStatusManager) {
+    this.idlenessSources = idlenessSources;
+    this.driverStatusManager = driverStatusManager;
+  }
+
+  public synchronized void onPotentiallyIdle(final IdleMessage reason) {
+    synchronized (driverStatusManager.get()) {
+      if (this.driverStatusManager.get().isShuttingDownOrFailing()) {
+        LOG.log(IDLE_REASONS_LEVEL, "Ignoring idle call from [{0}] for reason [{1}]",
+            new Object[]{reason.getComponentName(), reason.getReason()});
+      } else {
+        boolean isIdle = true;
+        LOG.log(IDLE_REASONS_LEVEL, "Checking for idle because {0} reported idleness for reason [{1}]",
+            new Object[]{reason.getComponentName(), reason.getReason()});
+
+
+        for (final DriverIdlenessSource idlenessSource : this.idlenessSources) {
+          final IdleMessage idleMessage = idlenessSource.getIdleStatus();
+          LOG.log(IDLE_REASONS_LEVEL, "[{0}] is reporting {1} because [{2}]."
+              , new Object[]{idleMessage.getComponentName(), idleMessage.isIdle() ? "idle" : "not idle", idleMessage.getReason()}
+          );
+          isIdle &= idleMessage.isIdle();
+        }
+
+        if (isIdle) {
+          LOG.log(Level.INFO, "All components indicated idle. Initiating Driver shutdown.");
+          this.driverStatusManager.get().onComplete();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdlenessSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdlenessSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdlenessSource.java
new file mode 100644
index 0000000..11f6ad1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdlenessSource.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+/**
+ * Implementations of this interface are used to determine driver idleness.
+ */
+public interface DriverIdlenessSource {
+
+  /**
+   * @return the idle status of the component.
+   */
+  IdleMessage getIdleStatus();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java
new file mode 100644
index 0000000..a8a87c5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.tang.InjectionFuture;
+
+import javax.inject.Inject;
+
+/**
+ * Checks for idleness of the Event handlers.
+ */
+public final class EventHandlerIdlenessSource implements DriverIdlenessSource {
+
+  private static final IdleMessage IDLE_MESSAGE = new IdleMessage("EventHandlers", "All events have been processed.", true);
+  private static final IdleMessage NOT_IDLE_MESSAGE = new IdleMessage("EventHandlers", "Some events are still in flight.", true);
+
+  private final InjectionFuture<Evaluators> evaluators;
+  private final InjectionFuture<DriverIdleManager> driverIdleManager;
+
+  @Inject
+  EventHandlerIdlenessSource(final InjectionFuture<Evaluators> evaluators,
+                             final InjectionFuture<DriverIdleManager> driverIdleManager) {
+    this.evaluators = evaluators;
+    this.driverIdleManager = driverIdleManager;
+  }
+
+
+  @Override
+  public IdleMessage getIdleStatus() {
+    if (this.evaluators.get().allEvaluatorsAreClosed()) {
+      return IDLE_MESSAGE;
+    } else {
+      return NOT_IDLE_MESSAGE;
+    }
+  }
+
+  public void check() {
+    if (this.evaluators.get().allEvaluatorsAreClosed()) {
+      this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/IdleMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/IdleMessage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/IdleMessage.java
new file mode 100644
index 0000000..6210458
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/IdleMessage.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+/**
+ * A message to be used to indicate that the driver is potentially idle.
+ */
+public final class IdleMessage {
+
+  private final String componentName;
+  private final String reason;
+  private final boolean isIdle;
+
+  /**
+   * @param componentName the name of the component that is indicating (not) idle, e.g. "Clock"
+   * @param reason        the human-readable reason the component indicates (not) idle, "No more alarms scheduled."
+   * @param isIdle        whether or not the component is idle.
+   */
+  public IdleMessage(final String componentName, final String reason, final boolean isIdle) {
+    this.componentName = componentName;
+    this.reason = reason;
+    this.isIdle = isIdle;
+  }
+
+  /**
+   * @return the name of the component that indicated (not) idle.
+   */
+  public String getComponentName() {
+    return this.componentName;
+  }
+
+  /**
+   * @return the reason for (no) idleness
+   */
+  public String getReason() {
+    return this.reason;
+  }
+
+  /**
+   * @return true if this message indicates idle.
+   */
+  public boolean isIdle() {
+    return this.isIdle;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/package-info.java
new file mode 100644
index 0000000..35dffa2
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Contains the Driver idle handling and extension APIs
+ */
+package org.apache.reef.runtime.common.driver.idle;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/package-info.java
new file mode 100644
index 0000000..520cfa1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of the Driver-side REEF APIs.
+ */
+package org.apache.reef.runtime.common.driver;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java
new file mode 100644
index 0000000..e5fa95e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Updates the ResourceCatalog with a new Node.
+ */
+@Private
+@DriverSide
+public final class NodeDescriptorHandler implements EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> {
+  private final ResourceCatalogImpl resourceCatalog;
+
+  @Inject
+  public NodeDescriptorHandler(ResourceCatalogImpl resourceCatalog) {
+    this.resourceCatalog = resourceCatalog;
+  }
+
+  @Override
+  public void onNext(final DriverRuntimeProtocol.NodeDescriptorProto value) {
+    this.resourceCatalog.handle(value);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java
new file mode 100644
index 0000000..7d0e1e8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManagerFactory;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Handles new resource allocations by adding a new EvaluatorManager.
+ */
+@Private
+@DriverSide
+public final class ResourceAllocationHandler
+    implements EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> {
+
+  /**
+   * Helper class to make new EvaluatorManager instances,
+   * given a Node they have been allocated on.
+   */
+  private final EvaluatorManagerFactory evaluatorManagerFactory;
+
+  /**
+   * The Evaluators known to the Driver.
+   */
+  private final Evaluators evaluators;
+
+  @Inject
+  ResourceAllocationHandler(
+      final EvaluatorManagerFactory evaluatorManagerFactory, final Evaluators evaluators) {
+    this.evaluatorManagerFactory = evaluatorManagerFactory;
+    this.evaluators = evaluators;
+  }
+
+  @Override
+  public void onNext(final DriverRuntimeProtocol.ResourceAllocationProto value) {
+    // FIXME: Using this put() method is a temporary fix for the race condition
+    // described in issues #828 and #839. Use Evaluators.put(EvaluatorManager) instead
+    // when the bug is fixed.
+    this.evaluators.put(this.evaluatorManagerFactory, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerErrorHandler.java
new file mode 100644
index 0000000..7f13de8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerErrorHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Informs the client and then shuts down the driver forcefully in case of Resource Manager errors.
+ */
+public final class ResourceManagerErrorHandler implements EventHandler<ReefServiceProtos.RuntimeErrorProto> {
+
+
+  private final DriverStatusManager driverStatusManager;
+
+  @Inject
+  ResourceManagerErrorHandler(final DriverStatusManager driverStatusManager) {
+    this.driverStatusManager = driverStatusManager;
+  }
+
+  @Override
+  public synchronized void onNext(final ReefServiceProtos.RuntimeErrorProto runtimeErrorProto) {
+    this.driverStatusManager.onError(new Exception("Resource Manager failure"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
new file mode 100644
index 0000000..c0db65d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.idle.DriverIdleManager;
+import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
+import org.apache.reef.runtime.common.driver.idle.IdleMessage;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages the status of the Resource Manager.
+ */
+@DriverSide
+@Private
+public final class ResourceManagerStatus implements EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>,
+    DriverIdlenessSource {
+  private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName());
+
+  private static final String COMPONENT_NAME = "ResourceManager";
+  private static final IdleMessage IDLE_MESSAGE = new IdleMessage(COMPONENT_NAME, "No outstanding requests or allocations", true);
+
+  private final ResourceManagerErrorHandler resourceManagerErrorHandler;
+  private final DriverStatusManager driverStatusManager;
+  private final InjectionFuture<DriverIdleManager> driverIdleManager;
+
+  // Mutable state.
+  private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
+  private int outstandingContainerRequests = 0;
+  private int containerAllocationCount = 0;
+
+  @Inject
+  ResourceManagerStatus(final ResourceManagerErrorHandler resourceManagerErrorHandler,
+                        final DriverStatusManager driverStatusManager,
+                        final InjectionFuture<DriverIdleManager> driverIdleManager) {
+    this.resourceManagerErrorHandler = resourceManagerErrorHandler;
+    this.driverStatusManager = driverStatusManager;
+    this.driverIdleManager = driverIdleManager;
+  }
+
+  @Override
+  public synchronized void onNext(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+    final ReefServiceProtos.State newState = runtimeStatusProto.getState();
+    LOG.log(Level.FINEST, "Runtime status " + runtimeStatusProto);
+    this.outstandingContainerRequests = runtimeStatusProto.getOutstandingContainerRequests();
+    this.containerAllocationCount = runtimeStatusProto.getContainerAllocationCount();
+    this.setState(runtimeStatusProto.getState());
+
+    switch (newState) {
+      case FAILED:
+        this.onRMFailure(runtimeStatusProto);
+        break;
+      case DONE:
+        this.onRMDone(runtimeStatusProto);
+        break;
+      case RUNNING:
+        this.onRMRunning(runtimeStatusProto);
+        break;
+    }
+  }
+
+  /**
+   * Change the state of the Resource Manager to be RUNNING.
+   */
+  public synchronized void setRunning() {
+    this.setState(ReefServiceProtos.State.RUNNING);
+  }
+
+  /**
+   * @return idle, if there are no outstanding requests or allocations. Not idle else.
+   */
+  @Override
+  public synchronized IdleMessage getIdleStatus() {
+    if (this.isIdle()) {
+      return IDLE_MESSAGE;
+    } else {
+      final String message = new StringBuilder("There are ")
+          .append(this.outstandingContainerRequests)
+          .append(" outstanding container requests and ")
+          .append(this.containerAllocationCount)
+          .append(" allocated containers")
+          .toString();
+      return new IdleMessage(COMPONENT_NAME, message, false);
+    }
+  }
+
+
+  private synchronized void onRMFailure(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+    assert (runtimeStatusProto.getState() == ReefServiceProtos.State.FAILED);
+    this.resourceManagerErrorHandler.onNext(runtimeStatusProto.getError());
+  }
+
+  private synchronized void onRMDone(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+    assert (runtimeStatusProto.getState() == ReefServiceProtos.State.DONE);
+    LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown.");
+    this.driverStatusManager.onComplete();
+  }
+
+  private synchronized void onRMRunning(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+    assert (runtimeStatusProto.getState() == ReefServiceProtos.State.RUNNING);
+    if (this.isIdle()) {
+      this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
+    }
+  }
+
+
+  private synchronized boolean isIdle() {
+    return this.hasNoOutstandingRequests()
+        && this.hasNoContainersAllocated();
+  }
+
+  private synchronized boolean isRunning() {
+    return ReefServiceProtos.State.RUNNING.equals(this.state);
+  }
+
+
+  private synchronized void setState(ReefServiceProtos.State state) {
+    // TODO: Add state transition check
+    this.state = state;
+  }
+
+
+  private synchronized boolean hasNoOutstandingRequests() {
+    return this.outstandingContainerRequests == 0;
+  }
+
+  private synchronized boolean hasNoContainersAllocated() {
+    return this.containerAllocationCount == 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
new file mode 100644
index 0000000..9abafbd
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManagerFactory;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks
+ * about the current state of a given resource. Ideally, we should think the same thing.
+ */
+@Private
+public final class ResourceStatusHandler implements EventHandler<DriverRuntimeProtocol.ResourceStatusProto> {
+
+  private final Evaluators evaluators;
+  private final EvaluatorManagerFactory evaluatorManagerFactory;
+
+  @Inject
+  ResourceStatusHandler(final Evaluators evaluators, final EvaluatorManagerFactory evaluatorManagerFactory) {
+    this.evaluators = evaluators;
+    this.evaluatorManagerFactory = evaluatorManagerFactory;
+  }
+
+  /**
+   * This resource status message comes from the ResourceManager layer; telling me what it thinks
+   * about the state of the resource executing an Evaluator; This method simply passes the message
+   * off to the referenced EvaluatorManager
+   *
+   * @param resourceStatusProto resource status message from the ResourceManager
+   */
+  @Override
+  public void onNext(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+    final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusProto.getIdentifier());
+    if (evaluatorManager.isPresent()) {
+      evaluatorManager.get().onResourceStatusMessage(resourceStatusProto);
+    } else {
+      if (resourceStatusProto.getIsFromPreviousDriver()) {
+        EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusProto);
+        previousEvaluatorManager.onResourceStatusMessage(resourceStatusProto);
+      } else {
+        throw new RuntimeException(
+            "Unknown resource status from evaluator " + resourceStatusProto.getIdentifier() +
+                " with state " + resourceStatusProto.getState()
+        );
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/package-info.java
new file mode 100644
index 0000000..9ffc87d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Classes that interface with the resourcemanager (Local, YARN, ...) in the Driver.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/CompletedTaskImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/CompletedTaskImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/CompletedTaskImpl.java
new file mode 100644
index 0000000..5d1b0d0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/CompletedTaskImpl.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.CompletedTask;
+
+@Private
+@DriverSide
+public final class CompletedTaskImpl implements CompletedTask {
+
+  private final ActiveContext context;
+  private final byte[] message;
+  private final String id;
+
+  public CompletedTaskImpl(final ActiveContext context, final byte[] message, final String id) {
+    this.context = context;
+    this.message = message;
+    this.id = id;
+  }
+
+  @Override
+  public ActiveContext getActiveContext() {
+    return this.context;
+  }
+
+  @Override
+  public byte[] get() {
+    return this.message;
+  }
+
+  @Override
+  public String getId() {
+    return this.id;
+  }
+
+  @Override
+  public String toString() {
+    return "CompletedTask{ID='" + getId() + "'}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
new file mode 100644
index 0000000..4aa3092
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol.ContextControlProto;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol.StopTaskProto;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol.SuspendTaskProto;
+import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Implements the RunningTask client interface. It is mainly a helper class
+ * that will package up various client method calls into protocol buffers and
+ * pass them to its respective EvaluatorManager to deliver to the EvaluatorRuntime.
+ */
+@Private
+@DriverSide
+public final class RunningTaskImpl implements RunningTask {
+
+  private final static Logger LOG = Logger.getLogger(RunningTask.class.getName());
+
+  private final EvaluatorManager evaluatorManager;
+  private final EvaluatorContext evaluatorContext;
+  private final String taskId;
+  private final TaskRepresenter taskRepresenter;
+
+  public RunningTaskImpl(final EvaluatorManager evaluatorManager,
+                         final String taskId,
+                         final EvaluatorContext evaluatorContext,
+                         final TaskRepresenter taskRepresenter) {
+    LOG.log(Level.FINEST, "INIT: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]");
+
+    this.evaluatorManager = evaluatorManager;
+    this.evaluatorContext = evaluatorContext;
+    this.taskId = taskId;
+    this.taskRepresenter = taskRepresenter;
+  }
+
+
+  @Override
+  public ActiveContext getActiveContext() {
+    return this.evaluatorContext;
+  }
+
+  @Override
+  public String getId() {
+    return this.taskId;
+  }
+
+  @Override
+  public void send(final byte[] message) {
+    LOG.log(Level.FINEST, "MESSAGE: Task id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]");
+
+    final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+        .setTaskMessage(ByteString.copyFrom(message))
+        .build();
+
+    this.evaluatorManager.sendContextControlMessage(contextControlProto);
+  }
+
+  @Override
+  public void close() {
+    LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]");
+
+    if (this.taskRepresenter.isNotRunning()) {
+      LOG.log(Level.FINE, "Ignoring call to .close() because the task is no longer RUNNING.");
+    } else {
+      final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+          .setStopTask(StopTaskProto.newBuilder().build())
+          .build();
+      this.evaluatorManager.sendContextControlMessage(contextControlProto);
+    }
+  }
+
+  @Override
+  public void close(final byte[] message) {
+    LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "] with message.");
+    if (this.taskRepresenter.isNotRunning()) {
+      throw new RuntimeException("Trying to send a message to a Task that is no longer RUNNING.");
+    }
+
+    final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+        .setStopTask(StopTaskProto.newBuilder().build())
+        .setTaskMessage(ByteString.copyFrom(message))
+        .build();
+    this.evaluatorManager.sendContextControlMessage(contextControlProto);
+  }
+
+  @Override
+  public void suspend(final byte[] message) {
+    LOG.log(Level.FINEST, "SUSPEND: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "] with message.");
+
+    final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+        .setSuspendTask(SuspendTaskProto.newBuilder().build())
+        .setTaskMessage(ByteString.copyFrom(message))
+        .build();
+    this.evaluatorManager.sendContextControlMessage(contextControlProto);
+  }
+
+  @Override
+  public void suspend() {
+    LOG.log(Level.FINEST, "SUSPEND: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]");
+
+    final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+        .setSuspendTask(SuspendTaskProto.newBuilder().build())
+        .build();
+    this.evaluatorManager.sendContextControlMessage(contextControlProto);
+  }
+
+  @Override
+  public String toString() {
+    return "RunningTask{taskId='" + taskId + "'}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/SuspendedTaskImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/SuspendedTaskImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/SuspendedTaskImpl.java
new file mode 100644
index 0000000..e750954
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/SuspendedTaskImpl.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.SuspendedTask;
+
+@Private
+@DriverSide
+public final class SuspendedTaskImpl implements SuspendedTask {
+  private final ActiveContext context;
+  private final byte[] message;
+  private final String id;
+
+  public SuspendedTaskImpl(final ActiveContext context, final byte[] message, final String id) {
+    this.context = context;
+    this.message = message;
+    this.id = id;
+  }
+
+  @Override
+  public ActiveContext getActiveContext() {
+    return this.context;
+  }
+
+  @Override
+  public byte[] get() {
+    return this.message;
+  }
+
+  @Override
+  public String getId() {
+    return this.id;
+  }
+
+  @Override
+  public String toString() {
+    return "SuspendedTask{ID='" + getId() + "'}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
new file mode 100644
index 0000000..5a29b47
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.TaskMessage;
+
+@Private
+@DriverSide
+public final class TaskMessageImpl implements TaskMessage {
+
+  private final byte[] theMessage;
+  private final String taskId;
+  private final String contextId;
+  private final String theMessageSourceId;
+
+  public TaskMessageImpl(final byte[] theMessage, final String taskId,
+                         final String contextId, final String theMessageSourceId) {
+    this.theMessage = theMessage;
+    this.taskId = taskId;
+    this.contextId = contextId;
+    this.theMessageSourceId = theMessageSourceId;
+  }
+
+  @Override
+  public byte[] get() {
+    return this.theMessage;
+  }
+
+  @Override
+  public String getId() {
+    return this.taskId;
+  }
+
+  @Override
+  public String getContextId() {
+    return this.contextId;
+  }
+
+  @Override
+  public final String getMessageSourceID() {
+    return this.theMessageSourceId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
new file mode 100644
index 0000000..1c65197
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents a Task on the Driver.
+ */
+@DriverSide
+@Private
+public final class TaskRepresenter {
+
+  private static final Logger LOG = Logger.getLogger(TaskRepresenter.class.getName());
+
+  private final EvaluatorContext context;
+  private final EvaluatorMessageDispatcher messageDispatcher;
+  private final EvaluatorManager evaluatorManager;
+  private final ExceptionCodec exceptionCodec;
+  private final String taskId;
+
+  // Mutable state
+  private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
+
+  public TaskRepresenter(final String taskId,
+                         final EvaluatorContext context,
+                         final EvaluatorMessageDispatcher messageDispatcher,
+                         final EvaluatorManager evaluatorManager,
+                         final ExceptionCodec exceptionCodec) {
+    this.taskId = taskId;
+    this.context = context;
+    this.messageDispatcher = messageDispatcher;
+    this.evaluatorManager = evaluatorManager;
+    this.exceptionCodec = exceptionCodec;
+  }
+
+  private static byte[] getResult(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+    return taskStatusProto.hasResult() ? taskStatusProto.getResult().toByteArray() : null;
+  }
+
+  public void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+
+    LOG.log(Level.FINE, "Received task {0} status {1}",
+        new Object[]{taskStatusProto.getTaskId(), taskStatusProto.getState()});
+
+    // Make sure that the message is indeed for us.
+    if (!taskStatusProto.getContextId().equals(this.context.getId())) {
+      throw new RuntimeException(
+          "Received a message for a task running on Context " + taskStatusProto.getContextId() +
+              " while the Driver believes this Task to be run on Context " + this.context.getId());
+    }
+
+    if (!taskStatusProto.getTaskId().equals(this.taskId)) {
+      throw new RuntimeException("Received a message for task " + taskStatusProto.getTaskId() +
+          " in the TaskRepresenter for Task " + this.taskId);
+    }
+    if (taskStatusProto.getRecovery()) {
+      // when a recovered heartbeat is received, we will take its word for it
+      LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.",
+          new Object[]{taskStatusProto.getState(), this.taskId});
+      this.setState(taskStatusProto.getState());
+    }
+    // Dispatch the message to the right method.
+    switch (taskStatusProto.getState()) {
+      case INIT:
+        this.onTaskInit(taskStatusProto);
+        break;
+      case RUNNING:
+        this.onTaskRunning(taskStatusProto);
+        break;
+      case SUSPEND:
+        this.onTaskSuspend(taskStatusProto);
+        break;
+      case DONE:
+        this.onTaskDone(taskStatusProto);
+        break;
+      case FAILED:
+        this.onTaskFailed(taskStatusProto);
+        break;
+      default:
+        throw new IllegalStateException("Unknown task state: " + taskStatusProto.getState());
+    }
+  }
+
+  private void onTaskInit(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+    assert ((ReefServiceProtos.State.INIT == taskStatusProto.getState()));
+    if (this.isKnown()) {
+      LOG.log(Level.WARNING, "Received a INIT message for task with id {0}" +
+          " which we have seen before. Ignoring the second message", this.taskId);
+    } else {
+      final RunningTask runningTask = new RunningTaskImpl(
+          this.evaluatorManager, this.taskId, this.context, this);
+      this.messageDispatcher.onTaskRunning(runningTask);
+      this.setState(ReefServiceProtos.State.RUNNING);
+    }
+  }
+
+  private void onTaskRunning(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+
+    assert (taskStatusProto.getState() == ReefServiceProtos.State.RUNNING);
+
+    if (this.isNotRunning()) {
+      throw new IllegalStateException("Received a task status message from task " + this.taskId +
+          " that is believed to be RUNNING on the Evaluator, but the Driver thinks it is in state " + this.state);
+    }
+
+    // fire driver restart task running handler if this is a recovery heartbeat
+    if (taskStatusProto.getRecovery()) {
+      final RunningTask runningTask = new RunningTaskImpl(
+          this.evaluatorManager, this.taskId, this.context, this);
+      this.messageDispatcher.onDriverRestartTaskRunning(runningTask);
+    }
+
+    for (final ReefServiceProtos.TaskStatusProto.TaskMessageProto taskMessageProto : taskStatusProto.getTaskMessageList()) {
+      this.messageDispatcher.onTaskMessage(
+          new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(),
+              this.taskId, this.context.getId(), taskMessageProto.getSourceId()));
+    }
+  }
+
+  private void onTaskSuspend(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+    assert (ReefServiceProtos.State.SUSPEND == taskStatusProto.getState());
+    assert (this.isKnown());
+    this.messageDispatcher.onTaskSuspended(
+        new SuspendedTaskImpl(this.context, getResult(taskStatusProto), this.taskId));
+    this.setState(ReefServiceProtos.State.SUSPEND);
+  }
+
+  private void onTaskDone(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+    assert (ReefServiceProtos.State.DONE == taskStatusProto.getState());
+    assert (this.isKnown());
+    this.messageDispatcher.onTaskCompleted(
+        new CompletedTaskImpl(this.context, getResult(taskStatusProto), this.taskId));
+    this.setState(ReefServiceProtos.State.DONE);
+  }
+
+  private void onTaskFailed(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+    assert (ReefServiceProtos.State.FAILED == taskStatusProto.getState());
+    final Optional<ActiveContext> evaluatorContext = Optional.<ActiveContext>of(this.context);
+    final Optional<byte[]> bytes = Optional.ofNullable(getResult(taskStatusProto));
+    final Optional<Throwable> exception = this.exceptionCodec.fromBytes(bytes);
+    final String message = exception.isPresent() ? exception.get().getMessage() : "No message given";
+    final Optional<String> description = Optional.empty();
+    final FailedTask failedTask = new FailedTask(
+        this.taskId, message, description, exception, bytes, evaluatorContext);
+    this.messageDispatcher.onTaskFailed(failedTask);
+    this.setState(ReefServiceProtos.State.FAILED);
+  }
+
+  public String getId() {
+    return this.taskId;
+  }
+
+  /**
+   * @return true, if we had at least one message from the task.
+   */
+  private boolean isKnown() {
+    return this.state != ReefServiceProtos.State.INIT;
+  }
+
+  /**
+   * @return true, if this task is in any other state but RUNNING.
+   */
+  public boolean isNotRunning() {
+    return this.state != ReefServiceProtos.State.RUNNING;
+  }
+
+  private void setState(final ReefServiceProtos.State newState) {
+    LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]",
+        new Object[]{this.taskId, this.state, newState});
+    this.state = newState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/package-info.java
new file mode 100644
index 0000000..ea55cca
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver-Side representations of tasks.
+ */
+@Private
+@DriverSide package org.apache.reef.runtime.common.driver.task;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DefaultDriverConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DefaultDriverConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DefaultDriverConnection.java
new file mode 100644
index 0000000..61ca305
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DefaultDriverConnection.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default implementation for re-establishing connection to driver after driver restart.
+ * In this default implementation, information about re-started driver will be obtained
+ * by querying Http server.
+ */
+public final class DefaultDriverConnection implements DriverConnection {
+
+  private static final Logger LOG = Logger.getLogger(DefaultDriverConnection.class.getName());
+
+  @Inject
+  public DefaultDriverConnection() {
+  }
+
+  @Override
+  public String getDriverRemoteIdentifier() {
+    LOG.log(Level.FINE, "Trying to get driver remote identifier by querying Http server.");
+    // TODO: implement a proper mechanism to obtain driver remote identifier.
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DriverConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DriverConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DriverConnection.java
new file mode 100644
index 0000000..a3ce3f3
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DriverConnection.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+/**
+ * Interface used for reconnecting to driver after driver restart
+ */
+public interface DriverConnection extends AutoCloseable {
+
+  /**
+   * @return driver remove identifier to facilitate reconnection to the (restarted) driver
+   */
+  String getDriverRemoteIdentifier();
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java
new file mode 100644
index 0000000..43d7469
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import org.apache.reef.runtime.common.evaluator.parameters.*;
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.time.Clock;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * The runtime configuration of an evaluator.
+ */
+public final class EvaluatorConfiguration extends ConfigurationModuleBuilder {
+  public static final RequiredParameter<String> DRIVER_REMOTE_IDENTIFIER = new RequiredParameter<>();
+  public static final RequiredParameter<String> EVALUATOR_IDENTIFIER = new RequiredParameter<>();
+  public static final RequiredParameter<String> ROOT_CONTEXT_CONFIGURATION = new RequiredParameter<>();
+  public static final OptionalParameter<String> ROOT_SERVICE_CONFIGURATION = new OptionalParameter<>();
+  public static final OptionalParameter<String> TASK_CONFIGURATION = new OptionalParameter<>();
+  public static final OptionalParameter<Integer> HEARTBEAT_PERIOD = new OptionalParameter<>();
+  public static final OptionalParameter<String> APPLICATION_IDENTIFIER = new OptionalParameter<>();
+  public static final ConfigurationModule CONF = new EvaluatorConfiguration()
+      .bindSetEntry(Clock.RuntimeStartHandler.class, EvaluatorRuntime.RuntimeStartHandler.class)
+      .bindSetEntry(Clock.RuntimeStopHandler.class, EvaluatorRuntime.RuntimeStopHandler.class)
+      .bindConstructor(ExecutorService.class, ExecutorServiceConstructor.class)
+      .bindNamedParameter(DriverRemoteIdentifier.class, DRIVER_REMOTE_IDENTIFIER)
+      .bindNamedParameter(EvaluatorIdentifier.class, EVALUATOR_IDENTIFIER)
+      .bindNamedParameter(HeartbeatPeriod.class, HEARTBEAT_PERIOD)
+      .bindNamedParameter(RootContextConfiguration.class, ROOT_CONTEXT_CONFIGURATION)
+      .bindNamedParameter(InitialTaskConfiguration.class, TASK_CONFIGURATION)
+      .bindNamedParameter(RootServiceConfiguration.class, ROOT_SERVICE_CONFIGURATION)
+      .bindNamedParameter(ApplicationIdentifier.class, APPLICATION_IDENTIFIER)
+      .build();
+
+  private final static class ExecutorServiceConstructor implements ExternalConstructor<ExecutorService> {
+
+    @Inject
+    ExecutorServiceConstructor() {
+    }
+
+    @Override
+    public ExecutorService newInstance() {
+      return Executors.newCachedThreadPool();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
new file mode 100644
index 0000000..b239ee8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol.EvaluatorControlProto;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.proto.ReefServiceProtos.EvaluatorStatusProto;
+import org.apache.reef.runtime.common.evaluator.context.ContextManager;
+import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier;
+import org.apache.reef.runtime.common.evaluator.parameters.EvaluatorIdentifier;
+import org.apache.reef.runtime.common.evaluator.parameters.HeartbeatPeriod;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+@EvaluatorSide
+final class EvaluatorRuntime implements EventHandler<EvaluatorControlProto> {
+
+  private final static Logger LOG = Logger.getLogger(EvaluatorRuntime.class.getName());
+
+  private final HeartBeatManager heartBeatManager;
+  private final ContextManager contextManager;
+  private final Clock clock;
+
+  private final String evaluatorIdentifier;
+  private final ExceptionCodec exceptionCodec;
+  private final AutoCloseable evaluatorControlChannel;
+
+  private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
+
+  @Inject
+  private EvaluatorRuntime(
+      final @Parameter(HeartbeatPeriod.class) int heartbeatPeriod,
+      final @Parameter(EvaluatorIdentifier.class) String evaluatorIdentifier,
+      final @Parameter(DriverRemoteIdentifier.class) String driverRID,
+      final HeartBeatManager.HeartbeatAlarmHandler heartbeatAlarmHandler,
+      final HeartBeatManager heartBeatManager,
+      final Clock clock,
+      final ContextManager contextManagerFuture,
+      final RemoteManager remoteManager,
+      final ExceptionCodec exceptionCodec) {
+
+    this.heartBeatManager = heartBeatManager;
+    this.contextManager = contextManagerFuture;
+    this.clock = clock;
+
+    this.evaluatorIdentifier = evaluatorIdentifier;
+    this.exceptionCodec = exceptionCodec;
+    this.evaluatorControlChannel =
+        remoteManager.registerHandler(driverRID, EvaluatorControlProto.class, this);
+
+    // start the heartbeats
+    clock.scheduleAlarm(heartbeatPeriod, heartbeatAlarmHandler);
+  }
+
+  private void onEvaluatorControlMessage(final EvaluatorControlProto message) {
+
+    synchronized (this.heartBeatManager) {
+
+      LOG.log(Level.FINEST, "Evaluator control message");
+
+      if (!message.getIdentifier().equals(this.evaluatorIdentifier)) {
+        this.onException(new RuntimeException(
+            "Identifier mismatch: message for evaluator id[" + message.getIdentifier()
+                + "] sent to evaluator id[" + this.evaluatorIdentifier + "]"
+        ));
+      } else if (ReefServiceProtos.State.RUNNING != this.state) {
+        this.onException(new RuntimeException(
+            "Evaluator sent a control message but its state is not "
+                + ReefServiceProtos.State.RUNNING + " but rather " + this.state
+        ));
+      } else {
+
+        if (message.hasContextControl()) {
+
+          LOG.log(Level.FINEST, "Send task control message to ContextManager");
+
+          try {
+            this.contextManager.handleContextControlProtocol(message.getContextControl());
+            if (this.contextManager.contextStackIsEmpty() && this.state == ReefServiceProtos.State.RUNNING) {
+              this.state = ReefServiceProtos.State.DONE;
+              this.heartBeatManager.sendEvaluatorStatus(this.getEvaluatorStatus());
+              this.clock.close();
+            }
+          } catch (final Throwable e) {
+            this.onException(e);
+            throw new RuntimeException(e);
+          }
+        }
+
+        if (message.hasKillEvaluator()) {
+          LOG.log(Level.SEVERE, "Evaluator {0} has been killed by the driver.", this.evaluatorIdentifier);
+          this.state = ReefServiceProtos.State.KILLED;
+          this.clock.close();
+        }
+      }
+    }
+  }
+
+  private final void onException(final Throwable exception) {
+    synchronized (this.heartBeatManager) {
+      this.state = ReefServiceProtos.State.FAILED;
+
+      final EvaluatorStatusProto evaluatorStatusProto = EvaluatorStatusProto.newBuilder()
+          .setEvaluatorId(this.evaluatorIdentifier)
+          .setError(ByteString.copyFrom(this.exceptionCodec.toBytes(exception)))
+          .setState(this.state)
+          .build();
+      this.heartBeatManager.sendEvaluatorStatus(evaluatorStatusProto);
+      this.contextManager.close();
+    }
+  }
+
+  public EvaluatorStatusProto getEvaluatorStatus() {
+    synchronized (this.heartBeatManager) {
+      LOG.log(Level.FINEST, "Evaluator heartbeat: state = {0}", this.state);
+      final EvaluatorStatusProto.Builder evaluatorStatus =
+          EvaluatorStatusProto.newBuilder()
+              .setEvaluatorId(this.evaluatorIdentifier)
+              .setState(this.state);
+      return evaluatorStatus.build();
+    }
+  }
+
+  final ReefServiceProtos.State getState() {
+    return this.state;
+  }
+
+  boolean isRunning() {
+    return this.state == ReefServiceProtos.State.RUNNING;
+  }
+
+  @Override
+  public void onNext(EvaluatorControlProto evaluatorControlProto) {
+    this.onEvaluatorControlMessage(evaluatorControlProto);
+  }
+
+  final class RuntimeStartHandler implements EventHandler<RuntimeStart> {
+
+    @Override
+    public final void onNext(final RuntimeStart runtimeStart) {
+      synchronized (EvaluatorRuntime.this.heartBeatManager) {
+        try {
+          LOG.log(Level.FINEST, "runtime start");
+          assert (ReefServiceProtos.State.INIT == EvaluatorRuntime.this.state);
+          EvaluatorRuntime.this.state = ReefServiceProtos.State.RUNNING;
+          EvaluatorRuntime.this.contextManager.start();
+          EvaluatorRuntime.this.heartBeatManager.sendHeartbeat();
+        } catch (final Throwable e) {
+          EvaluatorRuntime.this.onException(e);
+        }
+      }
+    }
+  }
+
+  final class RuntimeStopHandler implements EventHandler<RuntimeStop> {
+
+    @Override
+    public final void onNext(final RuntimeStop runtimeStop) {
+      synchronized (EvaluatorRuntime.this.heartBeatManager) {
+        LOG.log(Level.FINEST, "EvaluatorRuntime shutdown invoked for Evaluator {0} in state {1}",
+            new Object[]{evaluatorIdentifier, state});
+
+        if (EvaluatorRuntime.this.isRunning()) {
+          EvaluatorRuntime.this.onException(new RuntimeException(
+              "RuntimeStopHandler invoked in state RUNNING.", runtimeStop.getException()));
+        } else {
+          EvaluatorRuntime.this.contextManager.close();
+          try {
+            EvaluatorRuntime.this.evaluatorControlChannel.close();
+          } catch (final Exception e) {
+            LOG.log(Level.SEVERE, "Exception during shutdown of evaluatorControlChannel.", e);
+          }
+          LOG.log(Level.FINEST, "EvaluatorRuntime shutdown complete");
+        }
+      }
+    }
+  }
+}