You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/10/23 02:04:00 UTC

[29/51] [abbrv] [partial] Initial merge of Wake, Tang and REEF into one repository and project

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
new file mode 100644
index 0000000..1d88147
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.catalog;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Private
+public final class ResourceCatalogImpl implements ResourceCatalog {
+
+  public final static String DEFAULT_RACK = "/default-rack";
+  private static final Logger LOG = Logger.getLogger(ResourceCatalog.class.getName());
+  private final Map<String, RackDescriptorImpl> racks = new HashMap<>();
+
+  private final Map<String, NodeDescriptorImpl> nodes = new HashMap<>();
+
+  @Inject
+  ResourceCatalogImpl() {
+    LOG.log(Level.FINE, "Instantiated 'ResourceCatalogImpl'");
+  }
+
+  @Override
+  public synchronized String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("=== Resource Catalog ===");
+    for (final RackDescriptor rack : racks.values()) {
+      sb.append("\n" + rack);
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public synchronized Collection<NodeDescriptor> getNodes() {
+    return Collections.unmodifiableCollection(new ArrayList<NodeDescriptor>(this.nodes.values()));
+  }
+
+  @Override
+  public synchronized Collection<RackDescriptor> getRacks() {
+    return Collections.unmodifiableCollection(new ArrayList<RackDescriptor>(this.racks.values()));
+  }
+
+  public synchronized final NodeDescriptor getNode(final String id) {
+    return this.nodes.get(id);
+  }
+
+  public synchronized final void handle(final NodeDescriptorProto node) {
+    final String rack_name = (node.hasRackName() ? node.getRackName() : DEFAULT_RACK);
+
+    LOG.log(Level.FINEST, "Catalog new node: id[{0}], rack[{1}], host[{2}], port[{3}], memory[{4}]",
+        new Object[]{node.getIdentifier(), rack_name, node.getHostName(), node.getPort(),
+            node.getMemorySize()}
+    );
+
+    if (!this.racks.containsKey(rack_name)) {
+      final RackDescriptorImpl rack = new RackDescriptorImpl(rack_name);
+      this.racks.put(rack_name, rack);
+    }
+    final RackDescriptorImpl rack = this.racks.get(rack_name);
+    final InetSocketAddress address = new InetSocketAddress(node.getHostName(), node.getPort());
+    final NodeDescriptorImpl nodeDescriptor = new NodeDescriptorImpl(node.getIdentifier(), address, rack, node.getMemorySize());
+    this.nodes.put(nodeDescriptor.getId(), nodeDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
new file mode 100644
index 0000000..42dd3e8
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the communication channel to the client.
+ */
+@DriverSide
+public final class ClientConnection {
+
+  private static final Logger LOG = Logger.getLogger(ClientConnection.class.getName());
+
+  private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler;
+  private final String jobIdentifier;
+
+  @Inject
+  public ClientConnection(
+      final RemoteManager remoteManager,
+      final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID,
+      final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier) {
+    this.jobIdentifier = jobIdentifier;
+    if (clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) {
+      LOG.log(Level.FINE, "Instantiated 'ClientConnection' without an actual connection to the client.");
+      this.jobStatusHandler = new LoggingJobStatusHandler();
+    } else {
+      this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class);
+      LOG.log(Level.FINE, "Instantiated 'ClientConnection'");
+    }
+  }
+
+  /**
+   * Send the given JobStatus to the client.
+   *
+   * @param status
+   */
+  public synchronized void send(final ReefServiceProtos.JobStatusProto status) {
+    LOG.log(Level.FINEST, "Sending:\n" + status);
+    this.jobStatusHandler.onNext(status);
+  }
+
+  /**
+   * Send the given byte[] as a message to the client.
+   *
+   * @param message
+   */
+  public synchronized void sendMessage(final byte[] message) {
+    this.send(ReefServiceProtos.JobStatusProto.newBuilder()
+        .setIdentifier(this.jobIdentifier)
+        .setState(ReefServiceProtos.State.RUNNING)
+        .setMessage(ByteString.copyFrom(message))
+        .build());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
new file mode 100644
index 0000000..9b40f09
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.ClientCloseHandlers;
+import org.apache.reef.driver.parameters.ClientCloseWithMessageHandlers;
+import org.apache.reef.driver.parameters.ClientMessageHandlers;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.utils.BroadCastEventHandler;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the Client in the Driver.
+ */
+@Private
+@DriverSide
+public final class ClientManager implements EventHandler<ClientRuntimeProtocol.JobControlProto> {
+
+  private final static Logger LOG = Logger.getLogger(ClientManager.class.getName());
+
+
+  private final InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers;
+
+  private final InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers;
+
+  private final InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers;
+
+  private final DriverStatusManager driverStatusManager;
+
+  private volatile EventHandler<Void> clientCloseDispatcher;
+
+  private volatile EventHandler<byte[]> clientCloseWithMessageDispatcher;
+
+  private volatile EventHandler<byte[]> clientMessageDispatcher;
+
+
+  @Inject
+  ClientManager(final @Parameter(ClientCloseHandlers.class) InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers,
+                final @Parameter(ClientCloseWithMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers,
+                final @Parameter(ClientMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers,
+                final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID,
+                final RemoteManager remoteManager,
+                final DriverStatusManager driverStatusManager) {
+    this.driverStatusManager = driverStatusManager;
+    this.clientCloseHandlers = clientCloseHandlers;
+    this.clientCloseWithMessageHandlers = clientCloseWithMessageHandlers;
+    this.clientMessageHandlers = clientMessageHandlers;
+
+    if (!clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) {
+      remoteManager.registerHandler(clientRID, ClientRuntimeProtocol.JobControlProto.class, this);
+    } else {
+      LOG.log(Level.FINE, "Not registering a handler for JobControlProto, as there is no client.");
+    }
+  }
+
+  /**
+   * This method reacts to control messages passed by the client to the driver. It will forward
+   * messages related to the ClientObserver interface to the Driver. It will also initiate a shutdown
+   * if the client indicates a close message.
+   *
+   * @param jobControlProto contains the client initiated control message
+   */
+  @Override
+  public synchronized void onNext(final ClientRuntimeProtocol.JobControlProto jobControlProto) {
+    if (jobControlProto.hasSignal()) {
+      if (jobControlProto.getSignal() == ClientRuntimeProtocol.Signal.SIG_TERMINATE) {
+        try {
+          if (jobControlProto.hasMessage()) {
+            getClientCloseWithMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
+          } else {
+            getClientCloseDispatcher().onNext(null);
+          }
+        } finally {
+          this.driverStatusManager.onComplete();
+        }
+      } else {
+        LOG.log(Level.FINEST, "Unsupported signal: " + jobControlProto.getSignal());
+      }
+    } else if (jobControlProto.hasMessage()) {
+      getClientMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
+    }
+  }
+
+  private synchronized EventHandler<Void> getClientCloseDispatcher() {
+    if (clientCloseDispatcher != null) {
+      return clientCloseDispatcher;
+    } else {
+      synchronized (this) {
+        if (clientCloseDispatcher == null)
+          clientCloseDispatcher = new BroadCastEventHandler<>(clientCloseHandlers.get());
+      }
+      return clientCloseDispatcher;
+    }
+  }
+
+  private EventHandler<byte[]> getClientCloseWithMessageDispatcher() {
+    if (clientCloseWithMessageDispatcher != null) {
+      return clientCloseWithMessageDispatcher;
+    } else {
+      synchronized (this) {
+        if (clientCloseWithMessageDispatcher == null)
+          clientCloseWithMessageDispatcher = new BroadCastEventHandler<>(clientCloseWithMessageHandlers.get());
+      }
+      return clientCloseWithMessageDispatcher;
+    }
+  }
+
+  private EventHandler<byte[]> getClientMessageDispatcher() {
+    if (clientMessageDispatcher != null) {
+      return clientMessageDispatcher;
+    } else {
+      synchronized (this) {
+        if (clientMessageDispatcher == null)
+          clientMessageDispatcher = new BroadCastEventHandler<>(clientMessageHandlers.get());
+      }
+      return clientMessageDispatcher;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobMessageObserverImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobMessageObserverImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobMessageObserverImpl.java
new file mode 100644
index 0000000..31f6f39
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobMessageObserverImpl.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.client.JobMessageObserver;
+
+import javax.inject.Inject;
+
+/**
+ * An implementation of JobMessageObserver.
+ */
+@DriverSide
+public final class JobMessageObserverImpl implements JobMessageObserver {
+
+  private final ClientConnection clientConnection;
+
+  @Inject
+  public JobMessageObserverImpl(final ClientConnection clientConnection) {
+    this.clientConnection = clientConnection;
+  }
+
+  @Override
+  public void sendMessageToClient(final byte[] message) {
+    this.clientConnection.sendMessage(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
new file mode 100644
index 0000000..028b8cd
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A handler for job status messages that just logs them.
+ */
+final class LoggingJobStatusHandler implements EventHandler<ReefServiceProtos.JobStatusProto> {
+  private static final Logger LOG = Logger.getLogger(LoggingJobStatusHandler.class.getName());
+
+  @Inject
+  LoggingJobStatusHandler() {
+  }
+
+  @Override
+  public void onNext(final ReefServiceProtos.JobStatusProto jobStatusProto) {
+    LOG.log(Level.INFO, "Received a JobStatus message that can't be sent:\n" + jobStatusProto.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/package-info.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/package-info.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/package-info.java
new file mode 100644
index 0000000..c6b0d6f
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Objects representing the Client on the Driver.
+ */
+
+@DriverSide package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ClosedContextImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ClosedContextImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ClosedContextImpl.java
new file mode 100644
index 0000000..6464558
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ClosedContextImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * Driver side representation of a closed context.
+ */
+@DriverSide
+@Private
+public final class ClosedContextImpl implements ClosedContext {
+
+
+  private final ActiveContext parentContext;
+  private final String contextID;
+  private final String evaluatorId;
+  private final EvaluatorDescriptor evaluatorDescriptor;
+
+  /**
+   * @param parentContext       the parent context.
+   * @param contextID           the id of the closed context
+   * @param evaluatorId         the id of the evaluator on which the context was closed
+   * @param evaluatorDescriptor the descriptor of the evaluator on which the context was closed.
+   */
+  public ClosedContextImpl(final ActiveContext parentContext,
+                           final String contextID,
+                           final String evaluatorId,
+                           final EvaluatorDescriptor evaluatorDescriptor) {
+    this.parentContext = parentContext;
+    this.contextID = contextID;
+    this.evaluatorId = evaluatorId;
+    this.evaluatorDescriptor = evaluatorDescriptor;
+  }
+
+  @Override
+  public ActiveContext getParentContext() {
+    return parentContext;
+  }
+
+  @Override
+  public String getId() {
+    return this.contextID;
+  }
+
+  @Override
+  public String getEvaluatorId() {
+    return this.evaluatorId;
+  }
+
+  @Override
+  public Optional<String> getParentId() {
+    return Optional.of(this.parentContext.getId());
+  }
+
+  @Override
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.evaluatorDescriptor;
+  }
+
+  @Override
+  public String toString() {
+    return "ClosedContext{" +
+        "contextID='" + contextID + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextControlHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextControlHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextControlHandler.java
new file mode 100644
index 0000000..0c5ba15
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextControlHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorControlHandler;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handles context control messages.
+ */
+@DriverSide
+@Private
+public class ContextControlHandler {
+  private static final Logger LOG = Logger.getLogger(ContextControlHandler.class.getName());
+  private final EvaluatorControlHandler evaluatorControlHandler;
+  private final String evaluatorId;
+
+  /**
+   * @param evaluatorControlHandler used to send the actual evaluator control messages.
+   * @param evaluatorId             the ID of the evaluator this ContextControlHandler communicates with.
+   */
+  @Inject
+  ContextControlHandler(final EvaluatorControlHandler evaluatorControlHandler,
+                        final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorId) {
+    this.evaluatorControlHandler = evaluatorControlHandler;
+    this.evaluatorId = evaluatorId;
+    LOG.log(Level.FINE, "Instantiated 'ContextControlHandler'");
+  }
+
+  public synchronized void send(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
+    final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto =
+        EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
+            .setTimestamp(System.currentTimeMillis())
+            .setIdentifier(evaluatorId)
+            .setContextControl(contextControlProto).build();
+    this.evaluatorControlHandler.send(evaluatorControlProto);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextFactory.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextFactory.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextFactory.java
new file mode 100644
index 0000000..1fb027a
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextFactory.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Helper class to generate EvaluatorContext instances. Used in ContextRepresenters.
+ */
+@ThreadSafe
+final class ContextFactory {
+
+  private final String evaluatorId;
+  private final EvaluatorDescriptor evaluatorDescriptor;
+  private final ConfigurationSerializer configurationSerializer;
+  private final ExceptionCodec exceptionCodec;
+  private final EvaluatorMessageDispatcher messageDispatcher;
+  private final ContextControlHandler contextControlHandler;
+  private final InjectionFuture<ContextRepresenters> contextRepresenters;
+
+
+  @GuardedBy("this.priorIds")
+  private final Set<String> priorIds = new HashSet<>();
+
+
+  @Inject
+  ContextFactory(final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorId,
+                 final @Parameter(EvaluatorManager.EvaluatorDescriptorName.class) EvaluatorDescriptor evaluatorDescriptor,
+                 final ConfigurationSerializer configurationSerializer,
+                 final ExceptionCodec exceptionCodec,
+                 final EvaluatorMessageDispatcher messageDispatcher,
+                 final ContextControlHandler contextControlHandler,
+                 final InjectionFuture<ContextRepresenters> contextRepresenters) {
+    this.evaluatorId = evaluatorId;
+    this.evaluatorDescriptor = evaluatorDescriptor;
+    this.configurationSerializer = configurationSerializer;
+    this.exceptionCodec = exceptionCodec;
+    this.messageDispatcher = messageDispatcher;
+    this.contextControlHandler = contextControlHandler;
+    this.contextRepresenters = contextRepresenters;
+  }
+
+  /**
+   * Instantiate a new Context representer with the given id and parent id.
+   *
+   * @param contextId
+   * @param parentID
+   * @return a new Context representer with the given id and parent id.
+   */
+  public final EvaluatorContext newContext(final String contextId, final Optional<String> parentID) {
+    synchronized (this.priorIds) {
+      if (this.priorIds.contains(contextId)) {
+        throw new IllegalStateException("Creating second EvaluatorContext instance for id " + contextId);
+      }
+      this.priorIds.add(contextId);
+    }
+    return new EvaluatorContext(contextId,
+        this.evaluatorId,
+        this.evaluatorDescriptor,
+        parentID,
+        this.configurationSerializer,
+        this.contextControlHandler,
+        this.messageDispatcher,
+        this.exceptionCodec,
+        this.contextRepresenters.get());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
new file mode 100644
index 0000000..37c7266
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ContextMessage;
+
+/**
+ * Driver-side representation of a context message.
+ */
+@Private
+@DriverSide
+public final class ContextMessageImpl implements ContextMessage {
+  private final byte[] theMessage;
+  private final String theContextID;
+  private final String theMessageSourceId;
+
+  public ContextMessageImpl(final byte[] theMessage, final String theContextID, final String theMessageSourceId) {
+    this.theMessage = theMessage;
+    this.theContextID = theContextID;
+    this.theMessageSourceId = theMessageSourceId;
+  }
+
+  @Override
+  public byte[] get() {
+    return this.theMessage;
+  }
+
+  @Override
+  public String getId() {
+    return this.theContextID;
+  }
+
+  @Override
+  public final String getMessageSourceID() {
+    return this.theMessageSourceId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
new file mode 100644
index 0000000..0ca7ff1
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -0,0 +1,244 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver-Side representation of all contexts on an Evaluator.
+ */
+@ThreadSafe
+@DriverSide
+@Private
+public final class ContextRepresenters {
+  private static final Logger LOG = Logger.getLogger(ContextRepresenters.class.getName());
+
+  private final EvaluatorMessageDispatcher messageDispatcher;
+  private final ContextFactory contextFactory;
+
+  // Mutable fields
+  @GuardedBy("this")
+  private final List<EvaluatorContext> contextStack = new ArrayList<>();
+  @GuardedBy("this")
+  private final Set<String> contextIds = new HashSet<>();
+
+  @Inject
+  private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher,
+                              final ContextFactory contextFactory) {
+    this.messageDispatcher = messageDispatcher;
+    this.contextFactory = contextFactory;
+  }
+
+  /**
+   * Fetch the context with the given ID.
+   *
+   * @param contextId
+   * @return
+   */
+  public synchronized EvaluatorContext getContext(final String contextId) {
+    for (final EvaluatorContext context : this.contextStack) {
+      if (context.getId().equals(contextId)) return context;
+    }
+    throw new RuntimeException("Unknown evaluator context " + contextId);
+  }
+
+  /**
+   * Create the failed contexts for a FailedEvaluator event.
+   *
+   * @return
+   */
+  public synchronized List<FailedContext> getFailedContextsForEvaluatorFailure() {
+    final List<FailedContext> failedContextList = new ArrayList<>();
+    final List<EvaluatorContext> activeContexts = new ArrayList<>(this.contextStack);
+    Collections.reverse(activeContexts);
+
+    for (final EvaluatorContext context : activeContexts) {
+      failedContextList.add(context.getFailedContextForEvaluatorFailure());
+    }
+    return failedContextList;
+  }
+
+  /**
+   * Process heartbeats from the contexts on an Evaluator.
+   *
+   * @param contextStatusProto
+   * @param notifyClientOnNewActiveContext
+   */
+  public synchronized void onContextStatusMessages(final Iterable<ReefServiceProtos.ContextStatusProto> contextStatusProtos,
+                                                   final boolean notifyClientOnNewActiveContext) {
+    for (final ReefServiceProtos.ContextStatusProto contextStatusProto : contextStatusProtos) {
+      this.onContextStatusMessage(contextStatusProto, notifyClientOnNewActiveContext);
+    }
+  }
+
+
+  /**
+   * Process a heartbeat from a context
+   *
+   * @param contextStatusProto
+   * @param notifyClientOnNewActiveContext
+   */
+  private synchronized void onContextStatusMessage(final ReefServiceProtos.ContextStatusProto contextStatusProto,
+                                                   final boolean notifyClientOnNewActiveContext) {
+
+    LOG.log(Level.FINER, "Processing context status message for context {0}", contextStatusProto.getContextId());
+    switch (contextStatusProto.getContextState()) {
+      case READY:
+        this.onContextReady(contextStatusProto, notifyClientOnNewActiveContext);
+        break;
+      case FAIL:
+        this.onContextFailed(contextStatusProto);
+        break;
+      case DONE:
+        this.onContextDone(contextStatusProto);
+        break;
+      default:
+        this.onUnknownContextStatus(contextStatusProto);
+        break;
+    }
+    LOG.log(Level.FINER, "Done processing context status message for context {0}", contextStatusProto.getContextId());
+
+  }
+
+
+  private synchronized void onUnknownContextStatus(final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+    LOG.log(Level.WARNING, "Received unexpected context status: {0}", contextStatusProto);
+    throw new RuntimeException("Received unexpected context status: " + contextStatusProto.getContextState());
+  }
+
+  private synchronized void onContextFailed(final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+    assert (ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState());
+    final String contextID = contextStatusProto.getContextId();
+    LOG.log(Level.FINE, "Context {0} failed", contextID);
+    // It could have failed right away.
+    if (this.isUnknownContextId(contextID)) {
+      this.onNewContext(contextStatusProto, false);
+    }
+    final EvaluatorContext context = getContext(contextID);
+    this.removeContext(context);
+    this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatusProto));
+  }
+
+  private synchronized void onContextDone(final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+    assert (ReefServiceProtos.ContextStatusProto.State.DONE == contextStatusProto.getContextState());
+    final String contextID = contextStatusProto.getContextId();
+    if (isUnknownContextId(contextID)) {
+      throw new RuntimeException("Received DONE for context " + contextID + " which is unknown.");
+    } else {
+      LOG.log(Level.FINE, "Context {0} is DONE.", contextID);
+      final EvaluatorContext context = getContext(contextID);
+      removeContext(context);
+
+      if (context.isRootContext()) {
+        LOG.log(Level.FINE, "Root context {0} closed. Evaluator closed will trigger final shutdown.", contextID);
+      } else {
+        final EvaluatorContext parentContext = this.getContext(context.getParentId().get());
+        this.messageDispatcher.onContextClose(context.getClosedContext(parentContext));
+      }
+    }
+  }
+
+  /**
+   * Process a message with status READY from a context.
+   *
+   * @param contextStatusProto
+   * @param notifyClientOnNewActiveContext whether or not to inform the application when this in fact refers to a new
+   *                                       context.
+   */
+  private synchronized void onContextReady(final ReefServiceProtos.ContextStatusProto contextStatusProto,
+                                           final boolean notifyClientOnNewActiveContext) {
+    assert (ReefServiceProtos.ContextStatusProto.State.READY == contextStatusProto.getContextState());
+    final String contextID = contextStatusProto.getContextId();
+    // This could be the first message we get from that context
+    if (this.isUnknownContextId(contextID)) {
+      this.onNewContext(contextStatusProto, notifyClientOnNewActiveContext);
+    }
+
+    // Dispatch the messages to the application, if there are any.
+    for (final ReefServiceProtos.ContextStatusProto.ContextMessageProto contextMessageProto : contextStatusProto.getContextMessageList()) {
+      final byte[] theMessage = contextMessageProto.getMessage().toByteArray();
+      final String sourceID = contextMessageProto.getSourceId();
+      this.messageDispatcher.onContextMessage(new ContextMessageImpl(theMessage, contextID, sourceID));
+    }
+
+  }
+
+  /**
+   * Create and add a new context representer.
+   *
+   * @param contextStatusProto             the message to create the context from
+   * @param notifyClientOnNewActiveContext whether or not to fire an event to the user.
+   */
+  private synchronized void onNewContext(final ReefServiceProtos.ContextStatusProto contextStatusProto,
+                                         final boolean notifyClientOnNewActiveContext) {
+    final String contextID = contextStatusProto.getContextId();
+    LOG.log(Level.FINE, "Adding new context {0}.", contextID);
+
+    final Optional<String> parentID = contextStatusProto.hasParentId() ?
+        Optional.of(contextStatusProto.getParentId()) : Optional.<String>empty();
+    final EvaluatorContext context = contextFactory.newContext(contextID, parentID);
+    this.addContext(context);
+    if (contextStatusProto.getRecovery()) {
+      // when we get a recovered active context, always notify application
+      this.messageDispatcher.OnDriverRestartContextActive(context);
+    } else {
+      if (notifyClientOnNewActiveContext) {
+        this.messageDispatcher.onContextActive(context);
+      }
+    }
+  }
+
+  /**
+   * Add the given context to the data structures.
+   *
+   * @param context
+   */
+  private synchronized void addContext(final EvaluatorContext context) {
+    this.contextStack.add(context);
+    this.contextIds.add(context.getId());
+  }
+
+  /**
+   * Remove the given context from the data structures.
+   *
+   * @param context
+   */
+  private synchronized void removeContext(final EvaluatorContext context) {
+    this.contextStack.remove(context);
+    this.contextIds.remove(context.getId());
+  }
+
+  /**
+   * @param contextId
+   * @return true if the given context id is unknown so far.
+   */
+  private synchronized boolean isUnknownContextId(final String contextId) {
+    return !this.contextIds.contains(contextId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
new file mode 100644
index 0000000..e033116
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
@@ -0,0 +1,277 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver-side representation of a Context on an Evaluator.
+ */
+@DriverSide
+@Private
+public final class EvaluatorContext implements ActiveContext {
+
+  private final static Logger LOG = Logger.getLogger(EvaluatorContext.class.getName());
+
+  private final String contextIdentifier;
+  private final String evaluatorIdentifier;
+  private final EvaluatorDescriptor evaluatorDescriptor;
+
+  private final Optional<String> parentID;
+  private final ConfigurationSerializer configurationSerializer;
+  private final ContextControlHandler contextControlHandler;
+  private final ExceptionCodec exceptionCodec;
+  private final ContextRepresenters contextRepresenters;
+
+  private boolean isClosed = false;
+
+  public EvaluatorContext(final String contextIdentifier,
+                          final String evaluatorIdentifier,
+                          final EvaluatorDescriptor evaluatorDescriptor,
+                          final Optional<String> parentID,
+                          final ConfigurationSerializer configurationSerializer,
+                          final ContextControlHandler contextControlHandler,
+                          final EvaluatorMessageDispatcher messageDispatcher,
+                          final ExceptionCodec exceptionCodec,
+                          final ContextRepresenters contextRepresenters) {
+
+    this.contextIdentifier = contextIdentifier;
+    this.evaluatorIdentifier = evaluatorIdentifier;
+    this.evaluatorDescriptor = evaluatorDescriptor;
+    this.parentID = parentID;
+    this.configurationSerializer = configurationSerializer;
+    this.contextControlHandler = contextControlHandler;
+    this.exceptionCodec = exceptionCodec;
+    this.contextRepresenters = contextRepresenters;
+
+    LOG.log(Level.FINE, "Instantiated 'EvaluatorContext'");
+  }
+
+  @Override
+  public synchronized void close() {
+
+    if (this.isClosed) {
+      throw new RuntimeException("Active context already closed");
+    }
+
+    LOG.log(Level.FINEST, "Submit close context: RunningEvaluator id[{0}] for context id[{1}]",
+        new Object[]{getEvaluatorId(), getId()});
+
+    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+            .setRemoveContext(
+                EvaluatorRuntimeProtocol.RemoveContextProto.newBuilder()
+                    .setContextId(getId())
+                    .build())
+            .build();
+
+    this.contextControlHandler.send(contextControlProto);
+    this.isClosed = true;
+  }
+
+  @Override
+  public synchronized void sendMessage(final byte[] message) {
+
+    if (this.isClosed) {
+      throw new RuntimeException("Active context already closed");
+    }
+
+    LOG.log(Level.FINEST, "Send message: RunningEvaluator id[{0}] for context id[{1}]",
+        new Object[]{getEvaluatorId(), getId()});
+
+    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+            .setContextMessage(EvaluatorRuntimeProtocol.ContextMessageProto.newBuilder()
+                .setContextId(this.contextIdentifier)
+                .setMessage(ByteString.copyFrom(message))
+                .build())
+            .build();
+
+    this.contextControlHandler.send(contextControlProto);
+  }
+
+  @Override
+  public synchronized void submitTask(final Configuration taskConf) {
+
+    if (this.isClosed) {
+      throw new RuntimeException("Active context already closed");
+    }
+
+    LOG.log(Level.FINEST, "Submit task: RunningEvaluator id[{0}] for context id[{1}]",
+        new Object[]{getEvaluatorId(), getId()});
+
+    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+            .setStartTask(
+                EvaluatorRuntimeProtocol.StartTaskProto.newBuilder()
+                    .setContextId(this.contextIdentifier)
+                    .setConfiguration(this.configurationSerializer.toString(taskConf))
+                    .build())
+            .build();
+
+    this.contextControlHandler.send(contextControlProto);
+  }
+
+  @Override
+  public synchronized void submitContext(final Configuration contextConfiguration) {
+
+    if (this.isClosed) {
+      throw new RuntimeException("Active context already closed");
+    }
+
+    LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]",
+        new Object[]{getEvaluatorId(), getId()});
+
+    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+            .setAddContext(
+                EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
+                    .setParentContextId(getId())
+                    .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration))
+                    .build())
+            .build();
+
+    this.contextControlHandler.send(contextControlProto);
+  }
+
+  @Override
+  public synchronized void submitContextAndService(
+      final Configuration contextConfiguration, final Configuration serviceConfiguration) {
+
+    if (this.isClosed) {
+      throw new RuntimeException("Active context already closed");
+    }
+
+    LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]",
+        new Object[]{getEvaluatorId(), getId()});
+
+    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+            .setAddContext(
+                EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
+                    .setParentContextId(getId())
+                    .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration))
+                    .setServiceConfiguration(this.configurationSerializer.toString(serviceConfiguration))
+                    .build())
+            .build();
+
+    this.contextControlHandler.send(contextControlProto);
+  }
+
+  @Override
+  public String getEvaluatorId() {
+    return this.evaluatorIdentifier;
+  }
+
+  @Override
+  public Optional<String> getParentId() {
+    return this.parentID;
+  }
+
+  @Override
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.evaluatorDescriptor;
+  }
+
+  @Override
+  public String getId() {
+    return this.contextIdentifier;
+  }
+
+  @Override
+  public String toString() {
+    return "EvaluatorContext{" +
+        "contextIdentifier='" + this.contextIdentifier + '\'' +
+        ", evaluatorIdentifier='" + this.evaluatorIdentifier + '\'' +
+        ", parentID=" + this.parentID + '}';
+  }
+
+  public synchronized final ClosedContext getClosedContext(final ActiveContext parentContext) {
+    return new ClosedContextImpl(
+        parentContext, this.getId(), this.getEvaluatorId(), this.getEvaluatorDescriptor());
+  }
+
+  /**
+   * @return a FailedContext for the case of an EvaluatorFailure.
+   */
+  public synchronized FailedContext getFailedContextForEvaluatorFailure() {
+
+    final String id = this.getId();
+    final Optional<String> description = Optional.empty();
+    final Optional<byte[]> data = Optional.empty();
+    final Optional<Throwable> cause = Optional.empty();
+    final String message = "Evaluator Failure";
+
+    final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
+        Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
+        Optional.<ActiveContext>empty();
+
+    final EvaluatorDescriptor evaluatorDescriptor = getEvaluatorDescriptor();
+    final String evaluatorID = getEvaluatorId();
+
+    return new FailedContextImpl(
+        id, message, description, cause, data, parentContext, evaluatorDescriptor, evaluatorID);
+  }
+
+  public synchronized FailedContext getFailedContext(
+      final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+
+    assert (ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState());
+
+    final String id = this.getId();
+    final Optional<String> description = Optional.empty();
+
+    final Optional<byte[]> data = contextStatusProto.hasError() ?
+        Optional.of(contextStatusProto.getError().toByteArray()) :
+        Optional.<byte[]>empty();
+
+    final Optional<Throwable> cause = data.isPresent() ?
+        this.exceptionCodec.fromBytes(data) :
+        Optional.<Throwable>empty();
+
+    final String message = cause.isPresent() ? cause.get().getMessage() : "No message given";
+
+    final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
+        Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
+        Optional.<ActiveContext>empty();
+
+    final EvaluatorDescriptor evaluatorDescriptor = getEvaluatorDescriptor();
+    final String evaluatorID = getEvaluatorId();
+
+    return new FailedContextImpl(
+        id, message, description, cause, data, parentContext, evaluatorDescriptor, evaluatorID);
+  }
+
+  public synchronized boolean isRootContext() {
+    return !this.parentID.isPresent();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/FailedContextImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/FailedContextImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/FailedContextImpl.java
new file mode 100644
index 0000000..98e3db4
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/FailedContextImpl.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.common.AbstractFailure;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * Driver-Side representation of a failed context.
+ */
+@Private
+@DriverSide
+public final class FailedContextImpl extends AbstractFailure implements FailedContext {
+
+  private final Optional<ActiveContext> parentContext;
+  private final EvaluatorDescriptor evaluatorDescriptor;
+  private final String evaluatorID;
+
+  /**
+   * @param id                  Identifier of the entity that produced the error.
+   * @param message             One-line error message.
+   * @param description         Long error description.
+   * @param cause               Java Exception that caused the error.
+   * @param data                byte array that contains serialized version of the error.
+   * @param parentContext       the parent context, if there is one.
+   * @param evaluatorDescriptor the descriptor of the Evaluator this context failed on.
+   * @param evaluatorID         the id of the Evaluator this context failed on.
+   */
+  public FailedContextImpl(final String id,
+                           final String message,
+                           final Optional<String> description,
+                           final Optional<Throwable> cause,
+                           final Optional<byte[]> data,
+                           final Optional<ActiveContext> parentContext,
+                           final EvaluatorDescriptor evaluatorDescriptor,
+                           final String evaluatorID) {
+    super(id, message, description, cause, data);
+    this.parentContext = parentContext;
+    this.evaluatorDescriptor = evaluatorDescriptor;
+    this.evaluatorID = evaluatorID;
+  }
+
+
+  @Override
+  public Optional<ActiveContext> getParentContext() {
+    return this.parentContext;
+  }
+
+  @Override
+  public String getEvaluatorId() {
+    return this.evaluatorID;
+  }
+
+  @Override
+  public Optional<String> getParentId() {
+    if (this.getParentContext().isPresent()) {
+      return Optional.of(this.getParentContext().get().getId());
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.evaluatorDescriptor;
+  }
+
+
+  @Override
+  public String toString() {
+    return "FailedContext{" + "evaluatorID='" + evaluatorID + "', contextID='" + getId() + "'}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/package-info.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/package-info.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/package-info.java
new file mode 100644
index 0000000..24704b4
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementations of Driver-Side representations of Contexts running on an Evaluator
+ */
+@DriverSide
+@Private package org.apache.reef.runtime.common.driver.context;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
new file mode 100644
index 0000000..76733e6
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.util.ThreadLogger;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for close messages from the client: Throw an Exception.
+ */
+public final class DefaultClientCloseHandler implements EventHandler<Void> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultClientCloseHandler.class.getName());
+
+  @Inject
+  DefaultClientCloseHandler() {
+  }
+
+  @Override
+  public void onNext(final Void aVoid) {
+    final String message = ThreadLogger.getFormattedThreadList(
+        "Received a close message from the client, but no handler was bound for it. Active threads: ");
+    LOG.log(Level.WARNING, message);
+    throw new RuntimeException(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseWithMessageHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseWithMessageHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseWithMessageHandler.java
new file mode 100644
index 0000000..c3d7c71
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseWithMessageHandler.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default handler for close messages from the client: Throw an Exception.
+ */
+public final class DefaultClientCloseWithMessageHandler implements EventHandler<byte[]> {
+
+  @Inject
+  public DefaultClientCloseWithMessageHandler() {
+  }
+
+  @Override
+  public void onNext(final byte[] bytes) {
+    throw new RuntimeException(
+        "No handler bound for client Close With Message event: " + new String(bytes));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientMessageHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientMessageHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientMessageHandler.java
new file mode 100644
index 0000000..2cdf000
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientMessageHandler.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for Client messages: Logging it.
+ */
+public final class DefaultClientMessageHandler implements EventHandler<byte[]> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultClientMessageHandler.class.getName());
+
+  @Inject
+  public DefaultClientMessageHandler() {
+  }
+
+  @Override
+  public void onNext(final byte[] bytes) {
+    LOG.log(Level.INFO, "Received ClientMessage: {0}", new String(bytes));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextActiveHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextActiveHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextActiveHandler.java
new file mode 100644
index 0000000..002935c
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextActiveHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for ActiveContext: Close it.
+ */
+public final class DefaultContextActiveHandler implements EventHandler<ActiveContext> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultContextActiveHandler.class.getName());
+
+  @Inject
+  private DefaultContextActiveHandler() {
+  }
+
+  @Override
+  public void onNext(final ActiveContext activeContext) {
+    LOG.log(Level.INFO, "Received ActiveContext: {0} :: CLOSING", activeContext);
+    activeContext.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextClosureHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextClosureHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextClosureHandler.java
new file mode 100644
index 0000000..7d23169
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextClosureHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for ClosedContext: Logging it.
+ */
+public final class DefaultContextClosureHandler implements EventHandler<ClosedContext> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultContextClosureHandler.class.getName());
+
+  @Inject
+  public DefaultContextClosureHandler() {
+  }
+
+  @Override
+  public void onNext(final ClosedContext closedContext) {
+    LOG.log(Level.INFO, "Received ClosedContext: {0}", closedContext);
+    if (closedContext.getParentContext() != null) {
+      LOG.log(Level.INFO, "Closing parent context: {0}", closedContext.getParentContext().getId());
+      closedContext.getParentContext().close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextFailureHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextFailureHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextFailureHandler.java
new file mode 100644
index 0000000..884a865
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextFailureHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default event handler used for FailedContext: It crashes the driver.
+ */
+public final class DefaultContextFailureHandler implements EventHandler<FailedContext> {
+
+  @Inject
+  public DefaultContextFailureHandler() {
+  }
+
+  @Override
+  public void onNext(final FailedContext failedContext) {
+    if (failedContext.getReason().isPresent()) {
+      throw new RuntimeException(
+          "No handler bound for FailedContext:" + failedContext, failedContext.getReason().get());
+    } else {
+      throw new RuntimeException("No handler bound for FailedContext:" + failedContext);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextMessageHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextMessageHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextMessageHandler.java
new file mode 100644
index 0000000..7b5e4dd
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextMessageHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for ContextMessage: Logging it.
+ */
+public final class DefaultContextMessageHandler implements EventHandler<ContextMessage> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultContextMessageHandler.class.getName());
+
+  @Inject
+  public DefaultContextMessageHandler() {
+  }
+
+  @Override
+  public void onNext(final ContextMessage contextMessage) {
+    LOG.log(Level.INFO, "Received ContextMessage: {0}", contextMessage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartCompletedHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartCompletedHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartCompletedHandler.java
new file mode 100644
index 0000000..3825d99
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartCompletedHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for driver restart completed event: log it.
+ */
+public final class DefaultDriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultDriverRestartCompletedHandler.class.getName());
+
+  @Inject
+  private DefaultDriverRestartCompletedHandler() {
+  }
+
+  @Override
+  public void onNext(final DriverRestartCompleted restartCompleted) {
+    LOG.log(Level.INFO, "Driver restart completed at time [{0}].", restartCompleted.getTimeStamp());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartContextActiveHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartContextActiveHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartContextActiveHandler.java
new file mode 100644
index 0000000..477ee01
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartContextActiveHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for ActiveContext from previous evaluator during driver restart: Close it.
+ */
+public final class DefaultDriverRestartContextActiveHandler implements EventHandler<ActiveContext> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultDriverRestartContextActiveHandler.class.getName());
+
+  @Inject
+  private DefaultDriverRestartContextActiveHandler() {
+  }
+
+  @Override
+  public void onNext(final ActiveContext activeContext) {
+    LOG.log(Level.INFO, "Received ActiveContext running on previous Evaluator during driver restart: {0} :: CLOSING", activeContext);
+    activeContext.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartTaskRunningHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartTaskRunningHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartTaskRunningHandler.java
new file mode 100644
index 0000000..176a20a
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartTaskRunningHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for Task Restart TaskRuntime: Logging it.
+ */
+public final class DefaultDriverRestartTaskRunningHandler implements EventHandler<RunningTask> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultDriverRestartTaskRunningHandler.class.getName());
+
+  @Inject
+  public DefaultDriverRestartTaskRunningHandler() {
+  }
+
+  @Override
+  public void onNext(final RunningTask runningTask) {
+    throw new RuntimeException(
+        "RunningTask [" + runningTask.toString() + "] received during driver restart, but no DriverRestartTaskRunningHandler is bound");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorAllocationHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorAllocationHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorAllocationHandler.java
new file mode 100644
index 0000000..8e5bb74
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorAllocationHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for AllocatedEvaluator: close it.
+ */
+public final class DefaultEvaluatorAllocationHandler implements EventHandler<AllocatedEvaluator> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultContextMessageHandler.class.getName());
+
+  @Inject
+  public DefaultEvaluatorAllocationHandler() {
+  }
+
+  @Override
+  public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+    LOG.log(Level.INFO, "Received AllocatedEvaluator: {0} :: CLOSING", allocatedEvaluator);
+    allocatedEvaluator.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorCompletionHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorCompletionHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorCompletionHandler.java
new file mode 100644
index 0000000..5a387c9
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorCompletionHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for CompletedEvaluator: Logging it.
+ */
+public final class DefaultEvaluatorCompletionHandler implements EventHandler<CompletedEvaluator> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultContextMessageHandler.class.getName());
+
+  @Inject
+  public DefaultEvaluatorCompletionHandler() {
+  }
+
+  @Override
+  public void onNext(final CompletedEvaluator completedEvaluator) {
+    LOG.log(Level.INFO, "Received CompletedEvaluator: {0}", completedEvaluator);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorFailureHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorFailureHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorFailureHandler.java
new file mode 100644
index 0000000..34ff7b1
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorFailureHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default event handler used for FailedEvaluator: It crashes the driver.
+ */
+public final class DefaultEvaluatorFailureHandler implements EventHandler<FailedEvaluator> {
+
+  @Inject
+  public DefaultEvaluatorFailureHandler() {
+  }
+
+  @Override
+  public void onNext(final FailedEvaluator failedEvaluator) {
+    throw new RuntimeException(
+        "No handler bound for FailedEvaluator: " + failedEvaluator,
+        failedEvaluator.getEvaluatorException());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultTaskCompletionHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultTaskCompletionHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultTaskCompletionHandler.java
new file mode 100644
index 0000000..4dcc462
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultTaskCompletionHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for CompletedTask: Log it and close the context.
+ */
+public final class DefaultTaskCompletionHandler implements EventHandler<CompletedTask> {
+
+  private static final Logger LOG = Logger.getLogger(DefaultTaskCompletionHandler.class.getName());
+
+  @Inject
+  public DefaultTaskCompletionHandler() {
+  }
+
+  @Override
+  public void onNext(final CompletedTask completedTask) {
+    final ActiveContext context = completedTask.getActiveContext();
+    LOG.log(Level.INFO, "Received CompletedTask: {0} :: CLOSING context: {1}",
+        new Object[]{completedTask, context});
+    context.close();
+  }
+}