You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:13 UTC

[40/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverExceptionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverExceptionHandler.java
new file mode 100644
index 0000000..d145246
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverExceptionHandler.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Exception handler for exceptions thrown by client code in the Driver.
+ * It uses the JobMessageObserverImpl to send an update to the client and die.
+ */
+@Private
+@DriverSide
+public final class DriverExceptionHandler implements EventHandler<Throwable> {
+  private static final Logger LOG = Logger.getLogger(DriverExceptionHandler.class.getName());
+  /**
+   * We delegate the failures to this object.
+   */
+  private final DriverStatusManager driverStatusManager;
+
+  @Inject
+  public DriverExceptionHandler(final DriverStatusManager driverStatusManager) {
+    this.driverStatusManager = driverStatusManager;
+    LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'");
+  }
+
+
+  @Override
+  public void onNext(final Throwable throwable) {
+    this.driverStatusManager.onError(throwable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfiguration.java
new file mode 100644
index 0000000..a2e4078
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfiguration.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.parameters.DriverIdleSources;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl;
+import org.apache.reef.runtime.common.driver.client.ClientManager;
+import org.apache.reef.runtime.common.driver.client.JobMessageObserverImpl;
+import org.apache.reef.runtime.common.driver.idle.ClockIdlenessSource;
+import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorHandler;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationHandler;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceManagerStatus;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusHandler;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.wake.time.Clock;
+
+@Private
+@ClientSide
+public final class DriverRuntimeConfiguration extends ConfigurationModuleBuilder {
+
+  public static final ConfigurationModule CONF = new DriverRuntimeConfiguration()
+      // Resource Catalog
+      .bindImplementation(ResourceCatalog.class, ResourceCatalogImpl.class)
+
+          // JobMessageObserver
+      .bindImplementation(EvaluatorRequestor.class, EvaluatorRequestorImpl.class) // requesting evaluators
+      .bindImplementation(JobMessageObserver.class, JobMessageObserverImpl.class) // sending message to job client
+
+          // Client manager
+      .bindNamedParameter(DriverRuntimeConfigurationOptions.JobControlHandler.class, ClientManager.class)
+
+          // Bind the resourcemanager parameters
+      .bindNamedParameter(RuntimeParameters.NodeDescriptorHandler.class, NodeDescriptorHandler.class)
+      .bindNamedParameter(RuntimeParameters.ResourceAllocationHandler.class, ResourceAllocationHandler.class)
+      .bindNamedParameter(RuntimeParameters.ResourceStatusHandler.class, ResourceStatusHandler.class)
+      .bindNamedParameter(RuntimeParameters.RuntimeStatusHandler.class, ResourceManagerStatus.class)
+
+          // Bind to the Clock
+      .bindSetEntry(Clock.RuntimeStartHandler.class, DriverRuntimeStartHandler.class)
+      .bindSetEntry(Clock.RuntimeStopHandler.class, DriverRuntimeStopHandler.class)
+
+          // Bind the idle handlers
+      .bindSetEntry(DriverIdleSources.class, ClockIdlenessSource.class)
+      .bindSetEntry(DriverIdleSources.class, EventHandlerIdlenessSource.class)
+      .bindSetEntry(DriverIdleSources.class, ResourceManagerStatus.class)
+      .bindSetEntry(Clock.IdleHandler.class, ClockIdlenessSource.class)
+
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfigurationOptions.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfigurationOptions.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfigurationOptions.java
new file mode 100644
index 0000000..ae26f8e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeConfigurationOptions.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Houses the Driver resourcemanager configuration's NamedParameters
+ */
+public class DriverRuntimeConfigurationOptions {
+  @NamedParameter(doc = "Called when a job control message is received by the client.")
+  public final static class JobControlHandler implements Name<EventHandler<ClientRuntimeProtocol.JobControlProto>> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
new file mode 100644
index 0000000..114981b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorHeartbeatHandler;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorResourceManagerErrorHandler;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceManagerStatus;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The RuntimeStart handler of the Driver.
+ * <p/>
+ * This instantiates the DriverSingletons upon construction. Upon onNext(), it sets the resource manager status and
+ * wires up the remote event handler connections to the client and the evaluators.
+ */
+final class DriverRuntimeStartHandler implements EventHandler<RuntimeStart> {
+  private static final Logger LOG = Logger.getLogger(DriverRuntimeStartHandler.class.getName());
+  private final RemoteManager remoteManager;
+  private final EvaluatorResourceManagerErrorHandler evaluatorResourceManagerErrorHandler;
+  private final EvaluatorHeartbeatHandler evaluatorHeartbeatHandler;
+  private final ResourceManagerStatus resourceManagerStatus;
+  private final DriverStatusManager driverStatusManager;
+
+  /**
+   * @param singletons                           the objects we want to be Singletons in the Driver
+   * @param remoteManager                        the remoteManager in the Driver.
+   * @param evaluatorResourceManagerErrorHandler This will be wired up to the remoteManager on onNext()
+   * @param evaluatorHeartbeatHandler            This will be wired up to the remoteManager on onNext()
+   * @param resourceManagerStatus                will be set to RUNNING in onNext()
+   * @param driverStatusManager                  will be set to RUNNING in onNext()
+   */
+  @Inject
+  DriverRuntimeStartHandler(final DriverSingletons singletons,
+                            final RemoteManager remoteManager,
+                            final EvaluatorResourceManagerErrorHandler evaluatorResourceManagerErrorHandler,
+                            final EvaluatorHeartbeatHandler evaluatorHeartbeatHandler,
+                            final ResourceManagerStatus resourceManagerStatus,
+                            final DriverStatusManager driverStatusManager) {
+    this.remoteManager = remoteManager;
+    this.evaluatorResourceManagerErrorHandler = evaluatorResourceManagerErrorHandler;
+    this.evaluatorHeartbeatHandler = evaluatorHeartbeatHandler;
+    this.resourceManagerStatus = resourceManagerStatus;
+    this.driverStatusManager = driverStatusManager;
+  }
+
+  @Override
+  public synchronized void onNext(final RuntimeStart runtimeStart) {
+    LOG.log(Level.FINEST, "RuntimeStart: {0}", runtimeStart);
+
+    this.remoteManager.registerHandler(EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class, evaluatorHeartbeatHandler);
+    this.remoteManager.registerHandler(ReefServiceProtos.RuntimeErrorProto.class, evaluatorResourceManagerErrorHandler);
+    this.resourceManagerStatus.setRunning();
+    this.driverStatusManager.onRunning();
+    LOG.log(Level.FINEST, "DriverRuntimeStartHandler complete.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
new file mode 100644
index 0000000..70b1f97
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handler for the RuntimeStop event in the Driver. It shuts down the  evaluators and the RemoteManager and
+ * informs the Client.
+ */
+@Private
+@DriverSide
+final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
+  private static final Logger LOG = Logger.getLogger(DriverRuntimeStopHandler.class.getName());
+
+  private final DriverStatusManager driverStatusManager;
+  private final RemoteManager remoteManager;
+  private final Evaluators evaluators;
+
+  @Inject
+  DriverRuntimeStopHandler(final DriverStatusManager driverStatusManager,
+                           final RemoteManager remoteManager,
+                           final Evaluators evaluators) {
+    this.driverStatusManager = driverStatusManager;
+    this.remoteManager = remoteManager;
+    this.evaluators = evaluators;
+  }
+
+  @Override
+  public synchronized void onNext(final RuntimeStop runtimeStop) {
+    LOG.log(Level.FINEST, "RuntimeStop: {0}", runtimeStop);
+    // Shutdown the Evaluators.
+    this.evaluators.close();
+    // Inform the client of the shutdown.
+    final Optional<Throwable> exception = Optional.<Throwable>ofNullable(runtimeStop.getException());
+    this.driverStatusManager.sendJobEndingMessageToClient(exception);
+    // Close the remoteManager.
+    try {
+      this.remoteManager.close();
+      LOG.log(Level.INFO, "Driver shutdown complete");
+    } catch (final Exception e) {
+      throw new RuntimeException("Unable to close the RemoteManager.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingleton.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingleton.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingleton.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingleton.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
new file mode 100644
index 0000000..6ab42e9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+/**
+ * A class that depends on all objects we want to enforce to be singletons in the Driver.
+ * The DriverRuntimeStartHandler depends on an instance of this class, which instantiates its dependencies.
+ */
+@DriverSide
+@Private
+final class DriverSingletons {
+  @Inject
+  DriverSingletons(
+      // Application event handlers
+      final @Parameter(ContextActiveHandlers.class) Set<EventHandler<ActiveContext>> contextActiveEventHandlers,
+      final @Parameter(ContextClosedHandlers.class) Set<EventHandler<ClosedContext>> contextClosedEventHandlers,
+      final @Parameter(ContextFailedHandlers.class) Set<EventHandler<FailedContext>> contextFailedEventHandlers,
+      final @Parameter(ContextMessageHandlers.class) Set<EventHandler<ContextMessage>> contextMessageHandlers,
+      final @Parameter(TaskRunningHandlers.class) Set<EventHandler<RunningTask>> taskRunningEventHandlers,
+      final @Parameter(TaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> taskCompletedEventHandlers,
+      final @Parameter(TaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> taskSuspendedEventHandlers,
+      final @Parameter(TaskMessageHandlers.class) Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
+      final @Parameter(TaskFailedHandlers.class) Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
+      final @Parameter(EvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedEventHandlers,
+      final @Parameter(EvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
+      final @Parameter(EvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
+
+      // Service event handlers
+      final @Parameter(ServiceContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceContextActiveEventHandlers,
+      final @Parameter(ServiceContextClosedHandlers.class) Set<EventHandler<ClosedContext>> serviceContextClosedEventHandlers,
+      final @Parameter(ServiceContextFailedHandlers.class) Set<EventHandler<FailedContext>> serviceContextFailedEventHandlers,
+      final @Parameter(ServiceContextMessageHandlers.class) Set<EventHandler<ContextMessage>> serviceContextMessageHandlers,
+      final @Parameter(ServiceTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers,
+      final @Parameter(ServiceTaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers,
+      final @Parameter(ServiceTaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers,
+      final @Parameter(ServiceTaskMessageHandlers.class) Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers,
+      final @Parameter(ServiceTaskFailedHandlers.class) Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers,
+      final @Parameter(ServiceEvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers,
+      final @Parameter(ServiceEvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers,
+      final @Parameter(ServiceEvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers,
+
+      // Client event handler
+      final @Parameter(DriverRuntimeConfigurationOptions.JobControlHandler.class) EventHandler<ClientRuntimeProtocol.JobControlProto> jobControlHandler,
+
+      // Resource*Handlers - Should be invoked once
+      // The YarnResourceLaunchHandler creates and uploads
+      // the global jar file. If these handlers are
+      // instantiated for each container allocation
+      // we get container failures dure to modifications
+      // to already submitted global jar file
+      final ResourceLaunchHandler resourceLaunchHandler,
+      final ResourceReleaseHandler resourceReleaseHandler) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
new file mode 100644
index 0000000..a2f8f6f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.driver.parameters.DriverRestartHandler;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is bound to the start event of the clock and dispatches it to the approriate application code.
+ */
+public final class DriverStartHandler implements EventHandler<StartTime> {
+  private static final Logger LOG = Logger.getLogger(DriverStartHandler.class.getName());
+
+  private final Set<EventHandler<StartTime>> startHandlers;
+  private final Optional<EventHandler<StartTime>> restartHandler;
+  private final DriverStatusManager driverStatusManager;
+
+  @Inject
+  DriverStartHandler(final @Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class) Set<EventHandler<StartTime>> startHandler,
+                     final @Parameter(DriverRestartHandler.class) EventHandler<StartTime> restartHandler,
+                     final DriverStatusManager driverStatusManager) {
+    this.startHandlers = startHandler;
+    this.restartHandler = Optional.of(restartHandler);
+    this.driverStatusManager = driverStatusManager;
+    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler [{0}] and RestartHandler [{1}]",
+        new String[]{this.startHandlers.toString(), this.restartHandler.toString()});
+  }
+
+  @Inject
+  DriverStartHandler(final @Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class) Set<EventHandler<StartTime>> startHandler,
+                     final DriverStatusManager driverStatusManager) {
+    this.startHandlers = startHandler;
+    this.restartHandler = Optional.empty();
+    this.driverStatusManager = driverStatusManager;
+    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler [{0}] and no RestartHandler",
+        this.startHandlers.toString());
+  }
+
+  @Override
+  public void onNext(final StartTime startTime) {
+    if (isRestart()) {
+      this.onRestart(startTime);
+    } else {
+      this.onStart(startTime);
+    }
+  }
+
+  private void onRestart(final StartTime startTime) {
+    if (restartHandler.isPresent()) {
+      this.restartHandler.get().onNext(startTime);
+    } else {
+      // TODO: We might have to indicate this to YARN somehow such that it doesn't try another time.
+      throw new RuntimeException("Driver restart happened, but no ON_DRIVER_RESTART handler is bound.");
+    }
+  }
+
+  private void onStart(final StartTime startTime) {
+    for (final EventHandler<StartTime> startHandler : this.startHandlers) {
+      startHandler.onNext(startTime);
+    }
+  }
+
+  /**
+   * @return true, if the Driver is in fact being restarted.
+   */
+  private boolean isRestart() {
+    return this.driverStatusManager.getNumPreviousContainers() > 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
new file mode 100644
index 0000000..600092d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+/**
+ * The status of the Driver.
+ */
+public enum DriverStatus {
+  PRE_INIT,
+  INIT,
+  RUNNING,
+  SHUTTING_DOWN,
+  FAILING
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
new file mode 100644
index 0000000..e5e544c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.driver.client.ClientConnection;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.time.Clock;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages the Driver's status.
+ */
+public final class DriverStatusManager {
+  private static final Logger LOG = Logger.getLogger(DriverStatusManager.class.getName());
+  private final Clock clock;
+  private final ClientConnection clientConnection;
+  private final String jobIdentifier;
+  private final ExceptionCodec exceptionCodec;
+  private DriverStatus driverStatus = DriverStatus.PRE_INIT;
+  private Optional<Throwable> shutdownCause = Optional.empty();
+  private boolean driverTerminationHasBeenCommunicatedToClient = false;
+  private boolean restartCompleted = false;
+  private int numPreviousContainers = -1;
+  private int numRecoveredContainers = 0;
+
+
+  /**
+   * @param clock
+   * @param clientConnection
+   * @param jobIdentifier
+   * @param exceptionCodec
+   */
+  @Inject
+  DriverStatusManager(final Clock clock,
+                      final ClientConnection clientConnection,
+                      final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier,
+                      final ExceptionCodec exceptionCodec) {
+    LOG.entering(DriverStatusManager.class.getCanonicalName(), "<init>");
+    this.clock = clock;
+    this.clientConnection = clientConnection;
+    this.jobIdentifier = jobIdentifier;
+    this.exceptionCodec = exceptionCodec;
+    LOG.log(Level.FINE, "Instantiated 'DriverStatusManager'");
+    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "<init>");
+  }
+
+  /**
+   * Check whether a state transition 'from->to' is legal.
+   *
+   * @param from
+   * @param to
+   * @return
+   */
+  private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) {
+    switch (from) {
+      case PRE_INIT:
+        switch (to) {
+          case INIT:
+            return true;
+          default:
+            return false;
+        }
+      case INIT:
+        switch (to) {
+          case RUNNING:
+            return true;
+          default:
+            return false;
+        }
+      case RUNNING:
+        switch (to) {
+          case SHUTTING_DOWN:
+          case FAILING:
+            return true;
+          default:
+            return false;
+        }
+      case FAILING:
+      case SHUTTING_DOWN:
+        return false;
+      default:
+        throw new IllegalStateException("Unknown input state: " + from);
+    }
+  }
+
+  /**
+   * Changes the driver status to INIT and sends message to the client about the transition.
+   */
+  public synchronized void onInit() {
+    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onInit");
+    this.clientConnection.send(this.getInitMessage());
+    this.setStatus(DriverStatus.INIT);
+    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onInit");
+  }
+
+  /**
+   * Changes the driver status to RUNNING and sends message to the client about the transition.
+   * If the driver is in status 'PRE_INIT', this first calls onInit();
+   */
+  public synchronized void onRunning() {
+    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onRunning");
+    if (this.driverStatus.equals(DriverStatus.PRE_INIT)) {
+      this.onInit();
+    }
+    this.clientConnection.send(this.getRunningMessage());
+    this.setStatus(DriverStatus.RUNNING);
+    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onRunning");
+  }
+
+  /**
+   * End the Driver with an exception.
+   *
+   * @param exception
+   */
+  public synchronized void onError(final Throwable exception) {
+    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
+    if (this.isShuttingDownOrFailing()) {
+      LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception);
+    } else {
+      LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception);
+      this.shutdownCause = Optional.of(exception);
+      this.clock.stop();
+      this.setStatus(DriverStatus.FAILING);
+    }
+    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});
+  }
+
+  /**
+   * Perform a clean shutdown of the Driver.
+   */
+  public synchronized void onComplete() {
+    LOG.entering(DriverStatusManager.class.getCanonicalName(), "onComplete");
+    if (this.isShuttingDownOrFailing()) {
+      LOG.log(Level.WARNING, "Ignoring second call to onComplete()");
+    } else {
+      LOG.log(Level.INFO, "Clean shutdown of the Driver.");
+      if (LOG.isLoggable(Level.FINEST)) {
+        LOG.log(Level.FINEST, "Callstack: ", new Exception());
+      }
+      this.clock.close();
+      this.setStatus(DriverStatus.SHUTTING_DOWN);
+    }
+    LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onComplete");
+
+  }
+
+  /**
+   * Sends the final message to the Driver. This is used by DriverRuntimeStopHandler.onNext().
+   *
+   * @param exception
+   */
+  public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) {
+    if (this.isNotShuttingDownOrFailing()) {
+      LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. This is likely a illegal call to clock.close() at play. Current state: " + this.driverStatus);
+    }
+    if (this.driverTerminationHasBeenCommunicatedToClient) {
+      LOG.log(Level.SEVERE, ".sendJobEndingMessageToClient() called twice. Ignoring the second call");
+    } else {
+      { // Log the shutdown situation
+        if (this.shutdownCause.isPresent()) {
+          LOG.log(Level.WARNING, "Sending message about an unclean driver shutdown.", this.shutdownCause.get());
+        }
+        if (exception.isPresent()) {
+          LOG.log(Level.WARNING, "There was an exception during clock.close().", exception.get());
+        }
+        if (this.shutdownCause.isPresent() && exception.isPresent()) {
+          LOG.log(Level.WARNING, "The driver is shutdown because of an exception (see above) and there was an exception during clock.close(). Only the first exception will be sent to the client");
+        }
+      }
+      if (this.shutdownCause.isPresent()) {
+        // Send the earlier exception, if there was one
+        this.clientConnection.send(getJobEndingMessage(this.shutdownCause));
+      } else {
+        // Send the exception passed, if there was one.
+        this.clientConnection.send(getJobEndingMessage(exception));
+      }
+      this.driverTerminationHasBeenCommunicatedToClient = true;
+    }
+  }
+
+  /**
+   * Indicate that the Driver restart is complete. It is meant to be called exactly once during a restart and never
+   * during the ininital launch of a Driver.
+   */
+  public synchronized void setRestartCompleted() {
+    if (!this.isDriverRestart()) {
+      throw new IllegalStateException("setRestartCompleted() called in a Driver that is not, in fact, restarted.");
+    } else if (this.restartCompleted) {
+      LOG.log(Level.WARNING, "Calling setRestartCompleted more than once.");
+    } else {
+      this.restartCompleted = true;
+    }
+  }
+
+  /**
+   * @return the number of Evaluators expected to check in from a previous run.
+   */
+  public synchronized int getNumPreviousContainers() {
+    return this.numPreviousContainers;
+  }
+
+  /**
+   * Set the number of containers to expect still active from a previous execution of the Driver in a restart situation.
+   * To be called exactly once during a driver restart.
+   *
+   * @param num
+   */
+  public synchronized void setNumPreviousContainers(final int num) {
+    if (this.numPreviousContainers >= 0) {
+      throw new IllegalStateException("Attempting to set the number of expected containers left from a previous container more than once.");
+    } else {
+      this.numPreviousContainers = num;
+    }
+  }
+
+  /**
+   * @return the number of Evaluators from a previous Driver that have checked in with the Driver in a restart situation.
+   */
+  public synchronized int getNumRecoveredContainers() {
+    return this.numRecoveredContainers;
+  }
+
+  /**
+   * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run.
+   */
+  public synchronized void oneContainerRecovered() {
+    this.numRecoveredContainers += 1;
+    if (this.numRecoveredContainers > this.numPreviousContainers) {
+      throw new IllegalStateException("Reconnected to" +
+          this.numRecoveredContainers +
+          "Evaluators while only expecting " +
+          this.numPreviousContainers);
+    }
+  }
+
+  /**
+   * @return true if the Driver is a restarted driver of an earlier attempt.
+   */
+  private synchronized boolean isDriverRestart() {
+    return this.getNumPreviousContainers() > 0;
+  }
+
+  public synchronized boolean isShuttingDownOrFailing() {
+    return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus)
+        || DriverStatus.FAILING.equals(this.driverStatus);
+  }
+
+  private synchronized boolean isNotShuttingDownOrFailing() {
+    return !isShuttingDownOrFailing();
+  }
+
+  /**
+   * Helper method to set the status. This also checks whether the transition from the current status to the new one is
+   * legal.
+   *
+   * @param newStatus
+   */
+  private synchronized void setStatus(final DriverStatus newStatus) {
+    if (isLegalTransition(this.driverStatus, newStatus)) {
+      this.driverStatus = newStatus;
+    } else {
+      LOG.log(Level.WARNING, "Illegal state transiton: '" + this.driverStatus + "'->'" + newStatus + "'");
+    }
+  }
+
+  /**
+   * @param exception the exception that ended the Driver, if any.
+   * @return message to be sent to the client at the end of the job.
+   */
+  private synchronized ReefServiceProtos.JobStatusProto getJobEndingMessage(final Optional<Throwable> exception) {
+    final ReefServiceProtos.JobStatusProto message;
+    if (exception.isPresent()) {
+      message = ReefServiceProtos.JobStatusProto.newBuilder()
+          .setIdentifier(this.jobIdentifier)
+          .setState(ReefServiceProtos.State.FAILED)
+          .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(exception.get())))
+          .build();
+    } else {
+      message = ReefServiceProtos.JobStatusProto.newBuilder()
+          .setIdentifier(this.jobIdentifier)
+          .setState(ReefServiceProtos.State.DONE)
+          .build();
+    }
+    return message;
+  }
+
+  /**
+   * @return The message to be sent through the ClientConnection when in state INIT.
+   */
+  private synchronized ReefServiceProtos.JobStatusProto getInitMessage() {
+    return ReefServiceProtos.JobStatusProto.newBuilder()
+        .setIdentifier(this.jobIdentifier)
+        .setState(ReefServiceProtos.State.INIT)
+        .build();
+  }
+
+  /**
+   * @return The message to be sent through the ClientConnection when in state RUNNING.
+   */
+  private synchronized ReefServiceProtos.JobStatusProto getRunningMessage() {
+    return ReefServiceProtos.JobStatusProto.newBuilder()
+        .setIdentifier(this.jobIdentifier)
+        .setState(ReefServiceProtos.State.RUNNING)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
new file mode 100644
index 0000000..940de3c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * Implementation of the EvaluatorRequestor that translates the request and hands it down to the underlying RM.
+ */
+public final class EvaluatorRequestorImpl implements EvaluatorRequestor {
+
+  private static final Logger LOG = Logger.getLogger(EvaluatorRequestorImpl.class.getName());
+
+  private final ResourceCatalog resourceCatalog;
+  private final ResourceRequestHandler resourceRequestHandler;
+  private final LoggingScopeFactory loggingScopeFactory;
+
+  /**
+   * @param resourceCatalog
+   * @param resourceRequestHandler
+   */
+  @Inject
+  public EvaluatorRequestorImpl(final ResourceCatalog resourceCatalog,
+                                final ResourceRequestHandler resourceRequestHandler,
+                                final LoggingScopeFactory loggingScopeFactory) {
+    this.resourceCatalog = resourceCatalog;
+    this.resourceRequestHandler = resourceRequestHandler;
+    this.loggingScopeFactory = loggingScopeFactory;
+  }
+
+  @Override
+  public synchronized void submit(final EvaluatorRequest req) {
+    LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.", new Object[]{req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()});
+
+    if (req.getMegaBytes() <= 0) {
+      throw new IllegalArgumentException("Given an unsupported memory size: " + req.getMegaBytes());
+    }
+    if (req.getNumberOfCores() <= 0) {
+      throw new IllegalArgumentException("Given an unsupported core number: " + req.getNumberOfCores());
+    }
+    if (req.getNumber() <= 0) {
+      throw new IllegalArgumentException("Given an unsupported number of evaluators: " + req.getNumber());
+    }
+
+    try (LoggingScope ls = loggingScopeFactory.evaluatorSubmit(req.getNumber())) {
+      final DriverRuntimeProtocol.ResourceRequestProto.Builder request = DriverRuntimeProtocol.ResourceRequestProto
+          .newBuilder()
+          .setResourceCount(req.getNumber())
+          .setVirtualCores(req.getNumberOfCores())
+          .setMemorySize(req.getMegaBytes());
+
+      final ResourceCatalog.Descriptor descriptor = req.getDescriptor();
+      if (descriptor != null) {
+        if (descriptor instanceof RackDescriptor) {
+          request.addRackName(descriptor.getName());
+        } else if (descriptor instanceof NodeDescriptor) {
+          request.addNodeName(descriptor.getName());
+        } else {
+          throw new IllegalArgumentException("Unable to operate on descriptors of type " + descriptor.getClass().getName());
+        }
+      }
+
+      this.resourceRequestHandler.onNext(request.build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/AbstractDriverRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/AbstractDriverRuntimeConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/AbstractDriverRuntimeConfiguration.java
new file mode 100644
index 0000000..b832d22
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/AbstractDriverRuntimeConfiguration.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.util.Builder;
+
+/**
+ * @deprecated Runtimes are advised to create their own ConfigurationModules instead of subclassing this class.
+ */
+@Deprecated
+public abstract class AbstractDriverRuntimeConfiguration implements Builder<Configuration> {
+
+  protected JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder();
+
+  protected AbstractDriverRuntimeConfiguration(
+      final Class<? extends ResourceLaunchHandler> resourceLaunchHandlerClass,
+      final Class<? extends ResourceReleaseHandler> resourceReleaseHandlerClass,
+      final Class<? extends ResourceRequestHandler> resourceRequestHandlerClass) {
+    try {
+      this.builder.bind(ResourceLaunchHandler.class, resourceLaunchHandlerClass);
+      this.builder.bind(ResourceReleaseHandler.class, resourceReleaseHandlerClass);
+      this.builder.bind(ResourceRequestHandler.class, resourceRequestHandlerClass);
+
+    } catch (final BindException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public final Configuration build() {
+    return this.builder.build();
+  }
+
+  public final AbstractDriverRuntimeConfiguration addClientConfiguration(final Configuration conf) {
+    try {
+      this.builder.addConfiguration(conf);
+      return this;
+    } catch (final BindException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public final AbstractDriverRuntimeConfiguration setJobIdentifier(final String id) {
+    try {
+      this.builder.bindNamedParameter(JobIdentifier.class, id.toString());
+      return this;
+    } catch (final BindException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public final AbstractDriverRuntimeConfiguration setClientRemoteIdentifier(final String rid) {
+    try {
+      this.builder.bindNamedParameter(ClientRemoteIdentifier.class, rid.toString());
+      return this;
+    } catch (final BindException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public final AbstractDriverRuntimeConfiguration setDriverProcessMemory(final int memory) {
+    try {
+      this.builder.bindNamedParameter(DriverProcessMemory.class, Integer.toString(memory));
+      return this;
+    } catch (final BindException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public final AbstractDriverRuntimeConfiguration setEvaluatorTimeout(final long value) {
+    try {
+      this.builder.bindNamedParameter(EvaluatorTimeout.class, Long.toString(value));
+      return this;
+    } catch (final BindException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @NamedParameter(doc = "The job identifier.")
+  public final static class JobIdentifier implements Name<String> {
+  }
+
+  @NamedParameter(doc = "The client remote identifier.", default_value = ClientRemoteIdentifier.NONE)
+  public final static class ClientRemoteIdentifier implements Name<String> {
+    /**
+     * Indicates that there is no Client.
+     */
+    public static final String NONE = ErrorHandlerRID.NONE;
+  }
+
+  @NamedParameter(doc = "The evaluator timeout (how long to wait before deciding an evaluator is dead.", default_value = "60000")
+  public final static class EvaluatorTimeout implements Name<Long> {
+  }
+
+  /**
+   * This parameter denotes that the driver process should actually be
+   * started in a separate process with the given amount of JVM memory.
+   */
+  @NamedParameter(doc = "The driver process memory.", default_value = "512")
+  public final static class DriverProcessMemory implements Name<Integer> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/DefaultResourceManagerLifeCycle.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/DefaultResourceManagerLifeCycle.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/DefaultResourceManagerLifeCycle.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/DefaultResourceManagerLifeCycle.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java
new file mode 100644
index 0000000..65c3830
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ *
+ */
+@RuntimeAuthor
+public interface ResourceLaunchHandler extends EventHandler<DriverRuntimeProtocol.ResourceLaunchProto> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerLifeCycle.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerLifeCycle.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerLifeCycle.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerLifeCycle.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java
new file mode 100644
index 0000000..581eb63
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ *
+ */
+@RuntimeAuthor
+public interface ResourceReleaseHandler extends EventHandler<DriverRuntimeProtocol.ResourceReleaseProto> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java
new file mode 100644
index 0000000..3d30ec8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * The evaluator request handler.
+ */
+@RuntimeAuthor
+public interface ResourceRequestHandler extends EventHandler<DriverRuntimeProtocol.ResourceRequestProto> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java
new file mode 100644
index 0000000..e847249
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Driver resourcemanager parameters that resourcemanager implementations use to communicate with
+ * the Driver.
+ */
+@RuntimeAuthor
+public final class RuntimeParameters {
+
+  @NamedParameter(doc = "The resource allocation handler that stub runtimes send along allocated resources e.g., containers.")
+  public final static class ResourceAllocationHandler implements Name<EventHandler<DriverRuntimeProtocol.ResourceAllocationProto>> {
+  }
+
+  @NamedParameter(doc = "The node descriptor handler that stub runtimes send along node information.")
+  public final static class NodeDescriptorHandler implements Name<EventHandler<DriverRuntimeProtocol.NodeDescriptorProto>> {
+  }
+
+  @NamedParameter(doc = "The resource status handler.")
+  public final static class ResourceStatusHandler implements Name<EventHandler<DriverRuntimeProtocol.ResourceStatusProto>> {
+  }
+
+  @NamedParameter(doc = "The resourcemanager status handler.")
+  public final static class RuntimeStatusHandler implements Name<EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>> {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/package-info.java
new file mode 100644
index 0000000..50f52f6
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver-Side Event Handlers to be implemented by a specific resource manager.
+ */
+package org.apache.reef.runtime.common.driver.api;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/NodeDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/NodeDescriptorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/NodeDescriptorImpl.java
new file mode 100644
index 0000000..3cdf0d3
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/NodeDescriptorImpl.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.catalog;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+
+import java.net.InetSocketAddress;
+
+@Private
+public class NodeDescriptorImpl implements NodeDescriptor {
+
+  private final RackDescriptorImpl rack;
+
+  private final String id;
+
+  private final InetSocketAddress address;
+
+  private final int ram;
+
+  /**
+   * @param id
+   * @param address
+   * @param rack
+   * @param ram     the RAM available to the machine, in MegaBytes.
+   */
+  NodeDescriptorImpl(final String id, final InetSocketAddress address, final RackDescriptorImpl rack, final int ram) {
+    this.id = id;
+    this.address = address;
+    this.rack = rack;
+    this.ram = ram;
+    this.rack.addNodeDescriptor(this);
+  }
+
+  @Override
+  public String toString() {
+    return "Node [" + this.address + "]: RACK " + this.rack.getName() + ", RAM " + ram;
+  }
+
+  @Override
+  public final String getId() {
+    return this.id;
+  }
+
+
+  @Override
+  public InetSocketAddress getInetSocketAddress() {
+    return this.address;
+  }
+
+  @Override
+  public RackDescriptor getRackDescriptor() {
+    return this.rack;
+  }
+
+  @Override
+  public String getName() {
+    return this.address.getHostName();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/RackDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/RackDescriptorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/RackDescriptorImpl.java
new file mode 100644
index 0000000..4256ee5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/RackDescriptorImpl.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.catalog;
+
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public final class RackDescriptorImpl implements RackDescriptor {
+
+  private final String name;
+
+
+  private final List<NodeDescriptorImpl> nodes;
+
+  RackDescriptorImpl(final String name) {
+    this.name = name;
+    this.nodes = new ArrayList<>();
+  }
+
+  public final String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("Rack " + this.name);
+    for (final NodeDescriptorImpl node : nodes) {
+      sb.append("\n\t" + node);
+    }
+    return sb.toString();
+  }
+
+  public final int hashCode() {
+    return this.name.hashCode();
+  }
+
+  public final boolean equals(final Object obj) {
+    if (obj instanceof RackDescriptorImpl) {
+      return obj.toString().equals(this.name);
+    } else {
+      return false;
+    }
+  }
+
+  public String getName() {
+    return this.name;
+  }
+
+  @Override
+  public List<NodeDescriptor> getNodes() {
+    return Collections.unmodifiableList(new ArrayList<NodeDescriptor>(this.nodes));
+  }
+
+  /**
+   * Should only be used from YarnNodeDescriptor constructor.
+   *
+   * @param node to add.
+   */
+  void addNodeDescriptor(final NodeDescriptorImpl node) {
+    this.nodes.add(node);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
new file mode 100644
index 0000000..6193af4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.catalog;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Private
+public final class ResourceCatalogImpl implements ResourceCatalog {
+
+  public final static String DEFAULT_RACK = "/default-rack";
+  private static final Logger LOG = Logger.getLogger(ResourceCatalog.class.getName());
+  private final Map<String, RackDescriptorImpl> racks = new HashMap<>();
+
+  private final Map<String, NodeDescriptorImpl> nodes = new HashMap<>();
+
+  @Inject
+  ResourceCatalogImpl() {
+    LOG.log(Level.FINE, "Instantiated 'ResourceCatalogImpl'");
+  }
+
+  @Override
+  public synchronized String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("=== Resource Catalog ===");
+    for (final RackDescriptor rack : racks.values()) {
+      sb.append("\n" + rack);
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public synchronized Collection<NodeDescriptor> getNodes() {
+    return Collections.unmodifiableCollection(new ArrayList<NodeDescriptor>(this.nodes.values()));
+  }
+
+  @Override
+  public synchronized Collection<RackDescriptor> getRacks() {
+    return Collections.unmodifiableCollection(new ArrayList<RackDescriptor>(this.racks.values()));
+  }
+
+  public synchronized final NodeDescriptor getNode(final String id) {
+    return this.nodes.get(id);
+  }
+
+  public synchronized final void handle(final NodeDescriptorProto node) {
+    final String rack_name = (node.hasRackName() ? node.getRackName() : DEFAULT_RACK);
+
+    LOG.log(Level.FINEST, "Catalog new node: id[{0}], rack[{1}], host[{2}], port[{3}], memory[{4}]",
+        new Object[]{node.getIdentifier(), rack_name, node.getHostName(), node.getPort(),
+            node.getMemorySize()}
+    );
+
+    if (!this.racks.containsKey(rack_name)) {
+      final RackDescriptorImpl rack = new RackDescriptorImpl(rack_name);
+      this.racks.put(rack_name, rack);
+    }
+    final RackDescriptorImpl rack = this.racks.get(rack_name);
+    final InetSocketAddress address = new InetSocketAddress(node.getHostName(), node.getPort());
+    final NodeDescriptorImpl nodeDescriptor = new NodeDescriptorImpl(node.getIdentifier(), address, rack, node.getMemorySize());
+    this.nodes.put(nodeDescriptor.getId(), nodeDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
new file mode 100644
index 0000000..f7640b9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the communication channel to the client.
+ */
+@DriverSide
+public final class ClientConnection {
+
+  private static final Logger LOG = Logger.getLogger(ClientConnection.class.getName());
+
+  private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler;
+  private final String jobIdentifier;
+
+  @Inject
+  public ClientConnection(
+      final RemoteManager remoteManager,
+      final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID,
+      final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier) {
+    this.jobIdentifier = jobIdentifier;
+    if (clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) {
+      LOG.log(Level.FINE, "Instantiated 'ClientConnection' without an actual connection to the client.");
+      this.jobStatusHandler = new LoggingJobStatusHandler();
+    } else {
+      this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class);
+      LOG.log(Level.FINE, "Instantiated 'ClientConnection'");
+    }
+  }
+
+  /**
+   * Send the given JobStatus to the client.
+   *
+   * @param status
+   */
+  public synchronized void send(final ReefServiceProtos.JobStatusProto status) {
+    LOG.log(Level.FINEST, "Sending:\n" + status);
+    this.jobStatusHandler.onNext(status);
+  }
+
+  /**
+   * Send the given byte[] as a message to the client.
+   *
+   * @param message
+   */
+  public synchronized void sendMessage(final byte[] message) {
+    this.send(ReefServiceProtos.JobStatusProto.newBuilder()
+        .setIdentifier(this.jobIdentifier)
+        .setState(ReefServiceProtos.State.RUNNING)
+        .setMessage(ByteString.copyFrom(message))
+        .build());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
new file mode 100644
index 0000000..2a686af
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.ClientCloseHandlers;
+import org.apache.reef.driver.parameters.ClientCloseWithMessageHandlers;
+import org.apache.reef.driver.parameters.ClientMessageHandlers;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.utils.BroadCastEventHandler;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the Client in the Driver.
+ */
+@Private
+@DriverSide
+public final class ClientManager implements EventHandler<ClientRuntimeProtocol.JobControlProto> {
+
+  private final static Logger LOG = Logger.getLogger(ClientManager.class.getName());
+
+
+  private final InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers;
+
+  private final InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers;
+
+  private final InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers;
+
+  private final DriverStatusManager driverStatusManager;
+
+  private volatile EventHandler<Void> clientCloseDispatcher;
+
+  private volatile EventHandler<byte[]> clientCloseWithMessageDispatcher;
+
+  private volatile EventHandler<byte[]> clientMessageDispatcher;
+
+
+  @Inject
+  ClientManager(final @Parameter(ClientCloseHandlers.class) InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers,
+                final @Parameter(ClientCloseWithMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers,
+                final @Parameter(ClientMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers,
+                final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID,
+                final RemoteManager remoteManager,
+                final DriverStatusManager driverStatusManager) {
+    this.driverStatusManager = driverStatusManager;
+    this.clientCloseHandlers = clientCloseHandlers;
+    this.clientCloseWithMessageHandlers = clientCloseWithMessageHandlers;
+    this.clientMessageHandlers = clientMessageHandlers;
+
+    if (!clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) {
+      remoteManager.registerHandler(clientRID, ClientRuntimeProtocol.JobControlProto.class, this);
+    } else {
+      LOG.log(Level.FINE, "Not registering a handler for JobControlProto, as there is no client.");
+    }
+  }
+
+  /**
+   * This method reacts to control messages passed by the client to the driver. It will forward
+   * messages related to the ClientObserver interface to the Driver. It will also initiate a shutdown
+   * if the client indicates a close message.
+   *
+   * @param jobControlProto contains the client initiated control message
+   */
+  @Override
+  public synchronized void onNext(final ClientRuntimeProtocol.JobControlProto jobControlProto) {
+    if (jobControlProto.hasSignal()) {
+      if (jobControlProto.getSignal() == ClientRuntimeProtocol.Signal.SIG_TERMINATE) {
+        try {
+          if (jobControlProto.hasMessage()) {
+            getClientCloseWithMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
+          } else {
+            getClientCloseDispatcher().onNext(null);
+          }
+        } finally {
+          this.driverStatusManager.onComplete();
+        }
+      } else {
+        LOG.log(Level.FINEST, "Unsupported signal: " + jobControlProto.getSignal());
+      }
+    } else if (jobControlProto.hasMessage()) {
+      getClientMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
+    }
+  }
+
+  private synchronized EventHandler<Void> getClientCloseDispatcher() {
+    if (clientCloseDispatcher != null) {
+      return clientCloseDispatcher;
+    } else {
+      synchronized (this) {
+        if (clientCloseDispatcher == null)
+          clientCloseDispatcher = new BroadCastEventHandler<>(clientCloseHandlers.get());
+      }
+      return clientCloseDispatcher;
+    }
+  }
+
+  private EventHandler<byte[]> getClientCloseWithMessageDispatcher() {
+    if (clientCloseWithMessageDispatcher != null) {
+      return clientCloseWithMessageDispatcher;
+    } else {
+      synchronized (this) {
+        if (clientCloseWithMessageDispatcher == null)
+          clientCloseWithMessageDispatcher = new BroadCastEventHandler<>(clientCloseWithMessageHandlers.get());
+      }
+      return clientCloseWithMessageDispatcher;
+    }
+  }
+
+  private EventHandler<byte[]> getClientMessageDispatcher() {
+    if (clientMessageDispatcher != null) {
+      return clientMessageDispatcher;
+    } else {
+      synchronized (this) {
+        if (clientMessageDispatcher == null)
+          clientMessageDispatcher = new BroadCastEventHandler<>(clientMessageHandlers.get());
+      }
+      return clientMessageDispatcher;
+    }
+  }
+
+}