You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:54 UTC

[21/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
new file mode 100644
index 0000000..cdd8b90
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver;
+import org.apache.reef.runtime.local.process.RunnableProcess;
+import org.apache.reef.runtime.local.process.RunnableProcessObserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A Container that runs an Evaluator in a Process
+ */
+@Private
+@TaskSide
+final class ProcessContainer implements Container {
+
+  private static final Logger LOG = Logger.getLogger(ProcessContainer.class.getName());
+
+  private final String errorHandlerRID;
+  private final String nodeID;
+  private final File folder;
+  private final String containedID;
+  private final int megaBytes;
+  private final int numberOfCores;
+  private final REEFFileNames fileNames;
+  private final File reefFolder;
+  private final File localFolder;
+  private final File globalFolder;
+  private final RunnableProcessObserver processObserver;
+  private Thread theThread;
+  private RunnableProcess process;
+
+  /**
+   * @param errorHandlerRID the remoteID of the error handler.
+   * @param nodeID          the ID of the (fake) node this Container is instantiated on
+   * @param containedID     the  ID used to identify this container uniquely
+   * @param folder          the folder in which logs etc. will be deposited
+   */
+  ProcessContainer(final String errorHandlerRID,
+                   final String nodeID,
+                   final String containedID,
+                   final File folder,
+                   final int megaBytes,
+                   final int numberOfCores,
+                   final REEFFileNames fileNames,
+                   final ReefRunnableProcessObserver processObserver) {
+    this.errorHandlerRID = errorHandlerRID;
+    this.nodeID = nodeID;
+    this.containedID = containedID;
+    this.folder = folder;
+    this.megaBytes = megaBytes;
+    this.numberOfCores = numberOfCores;
+    this.fileNames = fileNames;
+    this.processObserver = processObserver;
+    this.reefFolder = new File(folder, fileNames.getREEFFolderName());
+    this.localFolder = new File(reefFolder, fileNames.getLocalFolderName());
+    this.localFolder.mkdirs();
+    this.globalFolder = new File(reefFolder, fileNames.getGlobalFolderName());
+    this.globalFolder.mkdirs();
+  }
+
+  private static void copy(final Iterable<File> files, final File folder) throws IOException {
+    for (final File sourceFile : files) {
+      final File destinationFile = new File(folder, sourceFile.getName());
+      if (Files.isSymbolicLink(sourceFile.toPath())) {
+        final Path linkTargetPath = Files.readSymbolicLink(sourceFile.toPath());
+        Files.createSymbolicLink(destinationFile.toPath(), linkTargetPath);
+      } else {
+        Files.copy(sourceFile.toPath(), destinationFile.toPath());
+      }
+    }
+  }
+
+  @Override
+  public void addLocalFiles(final Iterable<File> files) {
+    try {
+      copy(files, this.localFolder);
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to copy files to the evaluator folder.", e);
+    }
+  }
+
+  @Override
+  public void addGlobalFiles(final File globalFolder) {
+    try {
+      copy(Arrays.asList(globalFolder.listFiles()), this.globalFolder);
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to copy files to the evaluator folder.", e);
+    }
+  }
+
+  @Override
+  public void run(final List<String> commandLine) {
+    this.process = new RunnableProcess(commandLine,
+        this.containedID,
+        this.folder,
+        this.processObserver,
+        this.fileNames.getEvaluatorStdoutFileName(),
+        this.fileNames.getEvaluatorStderrFileName());
+    this.theThread = new Thread(this.process);
+    this.theThread.start();
+  }
+
+  @Override
+  public final boolean isRunning() {
+    return null != this.theThread && this.theThread.isAlive();
+  }
+
+  @Override
+  public final int getMemory() {
+    return this.megaBytes;
+  }
+
+  @Override
+  public final int getNumberOfCores() {
+    return this.numberOfCores;
+  }
+
+  @Override
+  public File getFolder() {
+    return this.folder;
+  }
+
+  @Override
+  public String getNodeID() {
+    return this.nodeID;
+  }
+
+  @Override
+  public String getContainerID() {
+    return this.containedID;
+  }
+
+  @Override
+  public void close() {
+    if (isRunning()) {
+      LOG.log(Level.WARNING, "Force-closing a container that is still running: {0}", this);
+      this.process.cancel();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "ProcessContainer{" +
+        "containedID='" + containedID + '\'' +
+        ", nodeID='" + nodeID + '\'' +
+        ", errorHandlerRID='" + errorHandlerRID + '\'' +
+        ", folder=" + folder +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
new file mode 100644
index 0000000..9708e77
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.LaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.runtime.local.client.parameters.DefaultMemorySize;
+import org.apache.reef.runtime.local.client.parameters.DefaultNumberOfCores;
+import org.apache.reef.runtime.local.driver.parameters.GlobalFiles;
+import org.apache.reef.runtime.local.driver.parameters.GlobalLibraries;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A resource manager that uses threads to execute containers.
+ */
+@Private
+@DriverSide
+public final class ResourceManager {
+
+  private final static Logger LOG = Logger.getLogger(ResourceManager.class.getName());
+
+  private final ResourceRequestQueue requestQueue = new ResourceRequestQueue();
+
+  private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler;
+  private final ContainerManager theContainers;
+  private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler;
+  private final int defaultMemorySize;
+  private final int defaultNumberOfCores;
+  private final ConfigurationSerializer configurationSerializer;
+  private final RemoteManager remoteManager;
+  private final REEFFileNames fileNames;
+  private final ClasspathProvider classpathProvider;
+  private final double jvmHeapFactor;
+  private final LoggingScopeFactory loggingScopeFactory;
+
+  @Inject
+  ResourceManager(
+      final ContainerManager containerManager,
+      final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler,
+      final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler,
+      final @Parameter(GlobalLibraries.class) Set<String> globalLibraries,
+      final @Parameter(GlobalFiles.class) Set<String> globalFiles,
+      final @Parameter(DefaultMemorySize.class) int defaultMemorySize,
+      final @Parameter(DefaultNumberOfCores.class) int defaultNumberOfCores,
+      final @Parameter(JVMHeapSlack.class) double jvmHeapSlack,
+      final ConfigurationSerializer configurationSerializer,
+      final RemoteManager remoteManager,
+      final REEFFileNames fileNames,
+      final ClasspathProvider classpathProvider,
+      final LoggingScopeFactory loggingScopeFactory) {
+
+    this.theContainers = containerManager;
+    this.allocationHandler = allocationHandler;
+    this.runtimeStatusHandlerEventHandler = runtimeStatusHandlerEventHandler;
+    this.configurationSerializer = configurationSerializer;
+    this.remoteManager = remoteManager;
+    this.defaultMemorySize = defaultMemorySize;
+    this.defaultNumberOfCores = defaultNumberOfCores;
+    this.fileNames = fileNames;
+    this.classpathProvider = classpathProvider;
+    this.jvmHeapFactor = 1.0 - jvmHeapSlack;
+    this.loggingScopeFactory = loggingScopeFactory;
+
+    LOG.log(Level.FINE, "Instantiated 'ResourceManager'");
+  }
+
+  /**
+   * Extracts the files out of the launchRequest.
+   *
+   * @param launchRequest the ResourceLaunchProto to parse
+   * @return a list of files set in the given ResourceLaunchProto
+   */
+  private static List<File> getLocalFiles(final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
+    final List<File> files = new ArrayList<>();  // Libraries local to this evaluator
+    for (final ReefServiceProtos.FileResourceProto frp : launchRequest.getFileList()) {
+      files.add(new File(frp.getPath()).getAbsoluteFile());
+    }
+    return files;
+  }
+
+  /**
+   * Receives a resource request.
+   * <p/>
+   * If the request can be met, it will also be satisfied immediately.
+   *
+   * @param resourceRequest the resource request to be handled.
+   */
+  final void onResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto resourceRequest) {
+    synchronized (this.theContainers) {
+      this.requestQueue.add(new ResourceRequest(resourceRequest));
+      this.checkRequestQueue();
+    }
+  }
+
+  /**
+   * Receives and processes a resource release request.
+   *
+   * @param releaseRequest the release request to be processed
+   */
+  final void onResourceReleaseRequest(final DriverRuntimeProtocol.ResourceReleaseProto releaseRequest) {
+    synchronized (this.theContainers) {
+      LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier());
+      this.theContainers.release(releaseRequest.getIdentifier());
+      this.checkRequestQueue();
+    }
+  }
+
+  /**
+   * Called when the ReefRunnableProcessObserver detects that the Evaluator process has exited.
+   *
+   * @param evaluatorId the ID of the Evaluator that exited.
+   */
+  public final void onEvaluatorExit(final String evaluatorId) {
+    synchronized (this.theContainers) {
+      this.theContainers.release(evaluatorId);
+      this.checkRequestQueue();
+    }
+  }
+
+  /**
+   * Processes a resource launch request.
+   *
+   * @param launchRequest the launch request to be processed.
+   */
+  final void onResourceLaunchRequest(
+      final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
+
+    synchronized (this.theContainers) {
+
+      final Container c = this.theContainers.get(launchRequest.getIdentifier());
+
+      try (final LoggingScope lb = this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:evaluatorConfigurationFile")) {
+        // Add the global files and libraries.
+        c.addGlobalFiles(this.fileNames.getGlobalFolder());
+        c.addLocalFiles(getLocalFiles(launchRequest));
+
+        // Make the configuration file of the evaluator.
+        final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath());
+
+        try {
+          this.configurationSerializer.toFile(this.configurationSerializer.fromString(launchRequest.getEvaluatorConf()),
+              evaluatorConfigurationFile);
+        } catch (final IOException | BindException e) {
+          throw new RuntimeException("Unable to write configuration.", e);
+        }
+      }
+
+      try (final LoggingScope lc = this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:runCommand")) {
+        // Assemble the command line
+        final LaunchCommandBuilder commandBuilder;
+        switch (launchRequest.getType()) {
+          case JVM:
+            commandBuilder = new JavaLaunchCommandBuilder()
+                .setClassPath(this.classpathProvider.getEvaluatorClasspath());
+            break;
+          case CLR:
+            commandBuilder = new CLRLaunchCommandBuilder();
+            break;
+          default:
+            throw new IllegalArgumentException(
+                "Unsupported container type: " + launchRequest.getType());
+        }
+
+        final List<String> command = commandBuilder
+            .setErrorHandlerRID(this.remoteManager.getMyIdentifier())
+            .setLaunchID(c.getNodeID())
+            .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath())
+            .setMemory((int) (this.jvmHeapFactor * c.getMemory()))
+            .build();
+
+        LOG.log(Level.FINEST, "Launching container: {0}", c);
+        c.run(command);
+      }
+    }
+  }
+
+  /**
+   * Checks the allocation queue for new allocations and if there are any
+   * satisfies them.
+   */
+  private void checkRequestQueue() {
+
+    if (this.theContainers.hasContainerAvailable() && this.requestQueue.hasOutStandingRequests()) {
+
+      // Record the satisfaction of one request and get its details.
+      final DriverRuntimeProtocol.ResourceRequestProto requestProto = this.requestQueue.satisfyOne();
+
+      // Allocate a Container
+      final Container container = this.theContainers.allocateOne(
+          requestProto.hasMemorySize() ? requestProto.getMemorySize() : this.defaultMemorySize,
+          requestProto.hasVirtualCores() ? requestProto.getVirtualCores() : this.defaultNumberOfCores);
+
+      // Tell the receivers about it
+      final DriverRuntimeProtocol.ResourceAllocationProto alloc =
+          DriverRuntimeProtocol.ResourceAllocationProto.newBuilder()
+              .setIdentifier(container.getContainerID())
+              .setNodeId(container.getNodeID())
+              .setResourceMemory(container.getMemory())
+              .setVirtualCores(container.getNumberOfCores())
+              .build();
+
+      LOG.log(Level.FINEST, "Allocating container: {0}", container);
+      this.allocationHandler.onNext(alloc);
+
+      // update REEF
+      this.sendRuntimeStatus();
+
+      // Check whether we can satisfy another one.
+      this.checkRequestQueue();
+
+    } else {
+      this.sendRuntimeStatus();
+    }
+  }
+
+  private void sendRuntimeStatus() {
+
+    final DriverRuntimeProtocol.RuntimeStatusProto msg =
+        DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
+            .setName("LOCAL")
+            .setState(ReefServiceProtos.State.RUNNING)
+            .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests())
+            .addAllContainerAllocation(this.theContainers.getAllocatedContainerIDs())
+            .build();
+
+    LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}",
+        new Object[]{msg.getContainerAllocationCount(), msg.getOutstandingContainerRequests()});
+    this.runtimeStatusHandlerEventHandler.onNext(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
new file mode 100644
index 0000000..bbb20a3
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+
+/**
+ * Manages a ResourceRequestProto and its satisfaction.
+ */
+@Private
+@DriverSide
+final class ResourceRequest {
+
+  private final DriverRuntimeProtocol.ResourceRequestProto req;
+  private int satisfied = 0;
+
+  ResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto req) {
+    if (null == req) {
+      throw new IllegalArgumentException("Can't instantiate a ResourceRequest without a ResourceRequestProto");
+    }
+    this.req = req;
+  }
+
+  /**
+   * Records that one resource has been allocated to this Request.
+   */
+  final void satisfyOne() {
+    this.satisfied += 1;
+    if (this.satisfied > req.getResourceCount()) {
+      throw new IllegalStateException("This request has been oversatisfied.");
+    }
+  }
+
+  /**
+   * @return true if the request is satisfied with this additional unit of
+   * resource, false otherwise.
+   */
+  final boolean isSatisfied() {
+    return this.satisfied == req.getResourceCount();
+  }
+
+  final DriverRuntimeProtocol.ResourceRequestProto getRequestProto() {
+    return this.req;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
new file mode 100644
index 0000000..c2a87ff
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Manages a queue of resource requests.
+ */
+@Private
+@DriverSide
+final class ResourceRequestQueue {
+
+  private final BlockingQueue<ResourceRequest> requestQueue = new LinkedBlockingQueue<>();
+
+  /**
+   * Add a request to the end of the queue.
+   *
+   * @param req the request to be added.
+   */
+  final void add(final ResourceRequest req) {
+    this.requestQueue.add(req);
+  }
+
+  /**
+   * @return true if there are outstanding resource requests. false otherwise.
+   */
+  final boolean hasOutStandingRequests() {
+    return !this.requestQueue.isEmpty();
+  }
+
+  /**
+   * Satisfies one resource for the front-most request. If that satisfies the
+   * request, it is removed from the queue.
+   */
+  final synchronized DriverRuntimeProtocol.ResourceRequestProto satisfyOne() {
+    final ResourceRequest req = this.requestQueue.element();
+    req.satisfyOne();
+    if (req.isSatisfied()) {
+      this.requestQueue.poll();
+    }
+    return req.getRequestProto();
+  }
+
+  final synchronized int getNumberOfOutstandingRequests() {
+    return this.requestQueue.size();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/package-info.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/package-info.java
new file mode 100644
index 0000000..d79462d
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The resource manager for the local resourcemanager
+ */
+package org.apache.reef.runtime.local.driver;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalFiles.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalFiles.java
new file mode 100644
index 0000000..489054f
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalFiles.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * The names of files that are to be copied to all evaluators.
+ */
+@NamedParameter(doc = "The names of files that are to be copied to all evaluators.")
+public final class GlobalFiles implements Name<Set<String>> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalLibraries.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalLibraries.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalLibraries.java
new file mode 100644
index 0000000..1912829
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalLibraries.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * The names of files that are to be copied to all evaluators.
+ */
+@NamedParameter(doc = "The names of files that are to be copied to all evaluators.")
+public final class GlobalLibraries implements Name<Set<String>> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalFiles.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalFiles.java
new file mode 100644
index 0000000..87226cf
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalFiles.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * Created by marku_000 on 2014-07-07.
+ */
+@NamedParameter(doc = "The names of files that are to be kept on the driver only.")
+public final class LocalFiles implements Name<Set<String>> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalLibraries.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalLibraries.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalLibraries.java
new file mode 100644
index 0000000..0aefb2b
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalLibraries.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * The names of files that are to be kept on the driver only.
+ */
+@NamedParameter(doc = "The names of files that are to be kept on the driver only.")
+public final class LocalLibraries implements Name<Set<String>> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/package-info.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/package-info.java
new file mode 100644
index 0000000..fccf5c7
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The named parameters for the Driver executed on the local runtime
+ */
+package org.apache.reef.runtime.local.driver.parameters;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/LoggingRunnableProcessObserver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/LoggingRunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/LoggingRunnableProcessObserver.java
new file mode 100644
index 0000000..dd0d75e
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/LoggingRunnableProcessObserver.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.process;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A RunnableProcessObserver that logs the events/
+ */
+public final class LoggingRunnableProcessObserver implements RunnableProcessObserver {
+  private static final Logger LOG = Logger.getLogger(LoggingRunnableProcessObserver.class.getName());
+
+  @Override
+  public void onProcessStarted(final String processId) {
+    LOG.log(Level.FINE, "Process {0} started.", processId);
+
+  }
+
+  @Override
+  public void onProcessExit(final String processId, final int exitCode) {
+    if (exitCode == 0) {
+      LOG.log(Level.FINE, "Process {0} exited with return code 0.", processId);
+    } else {
+      LOG.log(Level.WARNING, "Process {0} exited with return code {1}.", new Object[]{processId, exitCode});
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
new file mode 100644
index 0000000..36ee916
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.process;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.local.driver.ResourceManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * a RunnableProcessObserver that relies events to REEF's ResourceStatusHandler
+ */
+@ThreadSafe
+public final class ReefRunnableProcessObserver implements RunnableProcessObserver {
+  private static final Logger LOG = Logger.getLogger(ReefRunnableProcessObserver.class.getName());
+
+  private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler;
+  private final InjectionFuture<ResourceManager> resourceManager;
+
+  /**
+   * @param resourceStatusHandler the event handler to inform of resource changes.
+   */
+  @Inject
+  public ReefRunnableProcessObserver(final @Parameter(RuntimeParameters.ResourceStatusHandler.class)
+                                     EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler,
+                                     final InjectionFuture<ResourceManager> resourceManager) {
+    this.resourceStatusHandler = resourceStatusHandler;
+    this.resourceManager = resourceManager;
+  }
+
+  @Override
+  public void onProcessStarted(final String processId) {
+    this.onResourceStatus(
+        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+            .setIdentifier(processId)
+            .setState(ReefServiceProtos.State.RUNNING)
+            .build()
+    );
+  }
+
+  @Override
+  public void onProcessExit(final String processId, final int exitCode) {
+    // Note that the order here matters: We need to first inform the Driver's event handlers about the process exit
+    // and then release the resources. Otherwise, the Driver might be shutdown because of an idle condition before the
+    // message about the evaluator exit could have been sent and processed.
+    switch (exitCode) {
+      case 0:
+        this.onCleanExit(processId);
+        break;
+      default:
+        this.onUncleanExit(processId, exitCode);
+    }
+    this.resourceManager.get().onEvaluatorExit(processId);
+  }
+
+  /**
+   * Inform REEF of a cleanly exited process.
+   *
+   * @param processId
+   */
+  private void onCleanExit(final String processId) {
+    this.onResourceStatus(
+        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+            .setIdentifier(processId)
+            .setState(ReefServiceProtos.State.DONE)
+            .setExitCode(0)
+            .build()
+    );
+  }
+
+  /**
+   * Inform REEF of an unclean process exit
+   *
+   * @param processId
+   * @param exitCode
+   */
+  private void onUncleanExit(final String processId, final int exitCode) {
+    this.onResourceStatus(
+        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+            .setIdentifier(processId)
+            .setState(ReefServiceProtos.State.FAILED)
+            .setExitCode(exitCode)
+            .build()
+    );
+  }
+
+  private void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatus) {
+    LOG.log(Level.INFO, "Sending resource status: {0} ", resourceStatus);
+
+    // Here, we introduce an arbitrary wait. This is to make sure that at the exit of an Evaluator, the last
+    // heartbeat from that Evaluator arrives before this message. This makes sure that the local runtime behaves like
+    // a resource manager with regard to that timing.
+    try {
+      Thread.sleep(100);
+    } catch (final InterruptedException e) {
+      LOG.log(Level.FINEST, "Sleep interrupted. Event will be fired earlier than usual.");
+    }
+    this.resourceStatusHandler.onNext(resourceStatus);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java
new file mode 100644
index 0000000..d55e138
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.process;
+
+import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
+import org.apache.reef.util.OSUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * A runnable class that encapsulates a process.
+ */
+public final class RunnableProcess implements Runnable {
+
+  private static final Logger LOG = Logger.getLogger(RunnableProcess.class.getName());
+
+  private static final long DESTROY_WAIT_TIME = 100;
+
+  /**
+   * Name of the file used for STDERR redirection.
+   */
+  private final String standardErrorFileName;
+  /**
+   * Name of the file used for STDOUT redirection.
+   */
+  private final String standardOutFileName;
+
+  /**
+   * Command to execute.
+   */
+  private final List<String> command;
+  /**
+   * User supplied ID of this process.
+   */
+  private final String id;
+  /**
+   * The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited.
+   */
+  private final File folder;
+  /**
+   * The coarse-grained lock for state transition.
+   */
+  private final Lock stateLock = new ReentrantLock();
+  private final Condition doneCond = stateLock.newCondition();
+  /**
+   * This will be informed of process start and stop.
+   */
+  private final RunnableProcessObserver processObserver;
+  /**
+   * The process.
+   */
+  private Process process;
+  /**
+   * The state of the process.
+   */
+  private State state = State.INIT;   // synchronized on stateLock
+
+  /**
+   * @param command               the command to execute.
+   * @param id                    The ID of the process. This is used to name files and in the logs created by this process.
+   * @param folder                The folder in which this will store its stdout and stderr output
+   * @param processObserver       will be informed of process state changes.
+   * @param standardOutFileName   The name of the file used for redirecting STDOUT
+   * @param standardErrorFileName The name of the file used for redirecting STDERR
+   */
+  public RunnableProcess(final List<String> command,
+                         final String id,
+                         final File folder,
+                         final RunnableProcessObserver processObserver,
+                         final String standardOutFileName,
+                         final String standardErrorFileName) {
+    this.processObserver = processObserver;
+    this.command = new ArrayList<>(command);
+    this.id = id;
+    this.folder = folder;
+    assert (this.folder.isDirectory());
+    this.folder.mkdirs();
+    this.standardOutFileName = standardOutFileName;
+    this.standardErrorFileName = standardErrorFileName;
+    LOG.log(Level.FINEST, "RunnableProcess ready.");
+  }
+
+  /**
+   * Checks whether a transition from State 'from' to state 'to' is legal
+   *
+   * @param from
+   * @param to
+   * @return true, if the state transition is legal. False otherwise.
+   */
+  private static boolean isLegal(final State from, final State to) {
+    switch (from) {
+      case INIT:
+        switch (to) {
+          case INIT:
+          case RUNNING:
+          case ENDED:
+            return true;
+          default:
+            return false;
+        }
+      case RUNNING:
+        switch (to) {
+          case ENDED:
+            return true;
+          default:
+            return false;
+        }
+      case ENDED:
+        return false;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Runs the configured process.
+   *
+   * @throws java.lang.IllegalStateException if the process is already running or has been running before.
+   */
+  @Override
+  public void run() {
+    this.stateLock.lock();
+    try {
+      if (this.getState() != State.INIT) {
+        throw new IllegalStateException("The RunnableProcess can't be reused");
+      }
+
+      // Setup the stdout and stderr destinations.
+      final File errFile = new File(folder, standardErrorFileName);
+      final File outFile = new File(folder, standardOutFileName);
+
+      // Launch the process
+      try {
+        LOG.log(Level.FINEST, "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}",
+            new Object[]{this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()});
+        this.process = new ProcessBuilder()
+            .command(this.command)
+            .directory(this.folder)
+            .redirectError(errFile)
+            .redirectOutput(outFile)
+            .start();
+        this.setState(State.RUNNING);
+        this.processObserver.onProcessStarted(this.id);
+      } catch (final IOException ex) {
+        LOG.log(Level.SEVERE, "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}",
+            new Object[]{this.id, this.command, ex});
+      }
+    } finally {
+      this.stateLock.unlock();
+    }
+
+    try {
+      // Wait for its completion
+      final int returnValue = process.waitFor();
+      this.processObserver.onProcessExit(this.id, returnValue);
+      this.stateLock.lock();
+      try {
+        this.setState(State.ENDED);
+        this.doneCond.signalAll();
+      } finally {
+        this.stateLock.unlock();
+      }
+      LOG.log(Level.FINEST, "Process \"{0}\" returned {1}", new Object[]{this.id, returnValue});
+    } catch (final InterruptedException ex) {
+      LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
+          new Object[]{this.id, ex});
+    }
+  }
+
+
+  /**
+   * Cancels the running process if it is running.
+   */
+  public void cancel() {
+    this.stateLock.lock();
+    try {
+      if (this.processIsRunning()) {
+        this.process.destroy();
+        this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS);
+      }
+
+      if (this.processIsRunning()) {
+        LOG.log(Level.WARNING, "The child process survived Process.destroy()");
+        if (OSUtils.isLinux()) {
+          LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line");
+          try {
+            final long pid = readPID();
+            OSUtils.kill(pid);
+          } catch (final IOException | InterruptedException e) {
+            LOG.log(Level.SEVERE, "Unable to kill the process.", e);
+          }
+        }
+      }
+
+    } catch (final InterruptedException ex) {
+      LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
+          new Object[]{this.id, ex});
+    } finally {
+      this.stateLock.unlock();
+    }
+  }
+
+  /**
+   * @return the PID stored in the PID file.
+   * @throws IOException if the file can't be read.
+   */
+  private long readPID() throws IOException {
+    final String PIDFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME;
+    try (final BufferedReader r = new BufferedReader(new FileReader(PIDFileName))) {
+      return Long.valueOf(r.readLine());
+    }
+  }
+
+  private boolean processIsRunning() {
+    return this.getState() == State.RUNNING;
+  }
+
+  /**
+   * @return the current State of the process.
+   */
+  private State getState() {
+    return this.state;
+  }
+
+  /**
+   * Sets a new state for the process.
+   *
+   * @param newState
+   * @throws java.lang.IllegalStateException if the new state is illegal.
+   */
+  private void setState(final State newState) {
+    if (!isLegal(this.state, newState)) {
+      throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal");
+    }
+    this.state = newState;
+  }
+
+  /**
+   * The possible states of a process: INIT, RUNNING, ENDED.
+   */
+  private enum State {
+    // After initialization
+    INIT,
+    // The process is running
+    RUNNING,
+    // The process ended
+    ENDED
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessObserver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessObserver.java
new file mode 100644
index 0000000..fe58b74
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessObserver.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.process;
+
+/**
+ * Observer interface for events fired by RunnableProcess.
+ */
+public interface RunnableProcessObserver {
+  /**
+   * This will be called right after the process is launched.
+   *
+   * @param processId the id of the process that started.
+   */
+  public void onProcessStarted(final String processId);
+
+  /**
+   * This will be called right after the process exited.
+   *
+   * @param exitCode  the return code of the process.
+   * @param processId the id of the process that exited.
+   */
+  public void onProcessExit(final String processId, final int exitCode);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/package-info.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/package-info.java
new file mode 100644
index 0000000..964b6b9
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Infrastructure for managing processes.
+ */
+package org.apache.reef.runtime.local.process;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
new file mode 100644
index 0000000..658747d
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ResourceRequestQueueTest {
+
+  @Test
+  public void testEmptyAfterConstruction() {
+    final ResourceRequestQueue q = new ResourceRequestQueue();
+    Assert.assertFalse("A freshly generated queue should be empty", q.hasOutStandingRequests());
+  }
+
+  @Test
+  public void testNotEmptyAfterInsert() {
+    final ResourceRequestQueue q = new ResourceRequestQueue();
+    q.add(getAlmostSatisfied());
+    Assert.assertTrue("A queue should not be empty after an insert.", q.hasOutStandingRequests());
+  }
+
+  @Test
+  public void testSatisfaction() {
+    final ResourceRequestQueue q = new ResourceRequestQueue();
+    for (int i = 0; i < 1; ++i) {
+      q.add(getAlmostSatisfied());
+      Assert.assertTrue("A queue should not be empty after an insert.", q.hasOutStandingRequests());
+      q.satisfyOne();
+      Assert.assertFalse("The queue should be empty after all requests have been satisfied", q.hasOutStandingRequests());
+    }
+
+    final int nInsert = 10;
+    for (int i = 0; i < nInsert; ++i) {
+      q.add(getAlmostSatisfied());
+    }
+    for (int i = 0; i < nInsert; ++i) {
+      q.satisfyOne();
+    }
+    Assert.assertFalse("The queue should be empty after all requests have been satisfied", q.hasOutStandingRequests());
+  }
+
+  private ResourceRequest getAlmostSatisfied() {
+    return new ResourceRequest(DriverRuntimeProtocol.ResourceRequestProto.newBuilder().setResourceCount(1).setMemorySize(128).build());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
new file mode 100644
index 0000000..3156a02
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+public final class ResourceRequestTest {
+
+  @Test()
+  public void testInitializer() {
+    final ResourceRequest rr = get(1);
+    Assert.assertFalse("A fresh request should not be satisfied.", rr.isSatisfied());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInitializationWithNull() {
+    final ResourceRequest rr2 = new ResourceRequest(null);
+    Assert.fail("Passing null to the ResourceRequest constructor should throw an IllegalArgumentException.");
+  }
+
+
+  @Test
+  public void testSatisfaction() {
+    final int n = 10;
+    final ResourceRequest rr = get(n);
+    for (int i = 0; i < n; ++i) {
+      rr.satisfyOne();
+    }
+    Assert.assertTrue("A satisfied request should tell so", rr.isSatisfied());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testOverSatisfaction() {
+    final ResourceRequest rr = get(1);
+    rr.satisfyOne();
+    rr.satisfyOne();
+    Assert.fail("Satisfying more than the request should throw an IllegalStateException");
+  }
+
+  private ResourceRequest get(final int n) {
+    return new ResourceRequest(DriverRuntimeProtocol.ResourceRequestProto.newBuilder().setResourceCount(n).setMemorySize(128).build());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/pom.xml b/lang/java/reef-runtime-mesos/pom.xml
new file mode 100644
index 0000000..a5aa333
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.reef</groupId>
+        <artifactId>reef-project</artifactId>
+        <version>0.11.0-incubating-SNAPSHOT</version>
+    </parent>
+    <name>REEF Runtime for Mesos</name>
+    <artifactId>reef-runtime-mesos</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mesos</groupId>
+            <artifactId>mesos</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.avro</groupId>
+                <artifactId>avro-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>schema</goal>
+                        </goals>
+                        <configuration>
+                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+                            <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/avro</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc b/lang/java/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
new file mode 100644
index 0000000..ef4a152
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+  {
+    "namespace": "org.apache.reef.runtime.mesos.util",
+    "type": "record",
+    "name": "EvaluatorLaunch",
+    "fields": [
+      { "name": "identifier", "type": "string" },
+      { "name": "command", "type": "string" }
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.runtime.mesos.util",
+    "type": "record",
+    "name": "EvaluatorRelease",
+    "fields": [
+      { "name": "identifier", "type": "string" }
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.runtime.mesos.util",
+    "type": "record",
+    "name": "EvaluatorControl",
+    "fields": [
+      { "name": "evaluator_launch", "type": ["EvaluatorLaunch", "null"] },
+      { "name": "evaluator_release", "type": ["EvaluatorRelease", "null"] }
+    ]
+  }
+]
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
new file mode 100644
index 0000000..cc051d5
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos;
+
+import net.jcip.annotations.Immutable;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * Access to the classpath according to the REEF file system standard.
+ */
+@Immutable
+public final class MesosClasspathProvider implements RuntimeClasspathProvider {
+  private static final String HADOOP_CONF_DIR = System.getenv("HADOOP_CONF_DIR");
+  private static final String HADOOP_HOME = System.getenv("HADOOP_HOME");
+  private static final String HADOOP_COMMON_HOME = System.getenv("HADOOP_COMMON_HOME");
+  private static final String HADOOP_YARN_HOME = System.getenv("HADOOP_YARN_HOME");
+  private static final String HADOOP_HDFS_HOME = System.getenv("HADOOP_HDFS_HOME");
+  private static final String HADOOP_MAPRED_HOME = System.getenv("HADOOP_MAPRED_HOME");
+
+  // Used when we can't get a classpath from Hadoop
+  private static final String[] LEGACY_CLASSPATH_LIST = new String[]{
+      HADOOP_CONF_DIR,
+      HADOOP_HOME + "/*",
+      HADOOP_HOME + "/lib/*",
+      HADOOP_COMMON_HOME + "/*",
+      HADOOP_COMMON_HOME + "/lib/*",
+      HADOOP_YARN_HOME + "/*",
+      HADOOP_YARN_HOME + "/lib/*",
+      HADOOP_HDFS_HOME + "/*",
+      HADOOP_HDFS_HOME + "/lib/*",
+      HADOOP_MAPRED_HOME + "/*",
+      HADOOP_MAPRED_HOME + "/lib/*",
+      HADOOP_HOME + "/etc/hadoop",
+      HADOOP_HOME + "/share/hadoop/common/*",
+      HADOOP_HOME + "/share/hadoop/common/lib/*",
+      HADOOP_HOME + "/share/hadoop/yarn/*",
+      HADOOP_HOME + "/share/hadoop/yarn/lib/*",
+      HADOOP_HOME + "/share/hadoop/hdfs/*",
+      HADOOP_HOME + "/share/hadoop/hdfs/lib/*",
+      HADOOP_HOME + "/share/hadoop/mapreduce/*",
+      HADOOP_HOME + "/share/hadoop/mapreduce/lib/*"
+  };
+  private final List<String> classPathPrefix;
+  private final List<String> classPathSuffix;
+
+  @Inject
+  MesosClasspathProvider() {
+    this.classPathPrefix = Arrays.asList(LEGACY_CLASSPATH_LIST);
+    this.classPathSuffix = Arrays.asList(LEGACY_CLASSPATH_LIST);
+  }
+
+  @Override
+  public List<String> getDriverClasspathPrefix() {
+    return this.classPathPrefix;
+  }
+
+  @Override
+  public List<String> getDriverClasspathSuffix() {
+    return this.classPathSuffix;
+  }
+
+  @Override
+  public List<String> getEvaluatorClasspathPrefix() {
+    return this.classPathPrefix;
+  }
+
+  @Override
+  public List<String> getEvaluatorClasspathSuffix() {
+    return this.classPathSuffix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
new file mode 100644
index 0000000..280f2a6
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.client.REEF;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.runtime.common.client.REEFImplementation;
+import org.apache.reef.runtime.common.client.RunningJobImpl;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.mesos.MesosClasspathProvider;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
+import org.apache.reef.runtime.mesos.util.HDFSConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * A ConfigurationModule for the Mesos resource manager
+ */
+@Public
+@ClientSide
+public class MesosClientConfiguration extends ConfigurationModuleBuilder {
+  /**
+   * The folder in which the sub-folders for REEF drivers, one per job, will be created.
+   * If none is given, a folder "REEF_MESOS_RUNTIME" will be created in the local directory.
+   */
+  public static final OptionalParameter<String> ROOT_FOLDER = new OptionalParameter<>();
+
+  /**
+   * The ip address of Mesos Master
+   */
+  public static final RequiredParameter<String> MASTER_IP = new RequiredParameter<>();
+
+  public static final ConfigurationModule CONF = new MesosClientConfiguration()
+      .bindImplementation(REEF.class, REEFImplementation.class)
+      .bindImplementation(RunningJob.class, RunningJobImpl.class)
+      .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+      .bindImplementation(JobSubmissionHandler.class, MesosJobSubmissionHandler.class)
+      .bindNamedParameter(RootFolder.class, ROOT_FOLDER)
+      .bindNamedParameter(MasterIp.class, MASTER_IP)
+      .bindConstructor(Configuration.class, HDFSConfigurationConstructor.class)
+      .bindImplementation(RuntimeClasspathProvider.class, MesosClasspathProvider.class)
+      .build();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
new file mode 100644
index 0000000..d43a855
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos.FileResourceProto;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
+import org.apache.reef.runtime.mesos.driver.MesosDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+/**
+ * The current implementation runs the driver as a local process, similar to reef-runtime-local.
+ * TODO: run the driver on a slave node in the cluster
+ */
+@Private
+@ClientSide
+final class MesosJobSubmissionHandler implements JobSubmissionHandler {
+  public static final String DRIVER_FOLDER_NAME = "driver";
+
+  private final ConfigurationSerializer configurationSerializer;
+  private final ClasspathProvider classpath;
+  private final REEFFileNames fileNames;
+  private final String rootFolderName;
+  private final String masterIp;
+  private final double jvmSlack;
+
+  @Inject
+  MesosJobSubmissionHandler(final @Parameter(RootFolder.class) String rootFolderName,
+                            final @Parameter(MasterIp.class) String masterIp,
+                            final ConfigurationSerializer configurationSerializer,
+                            final REEFFileNames fileNames,
+                            final ClasspathProvider classpath,
+                            final @Parameter(JVMHeapSlack.class) double jvmSlack) {
+    this.rootFolderName = new File(rootFolderName).getAbsolutePath();
+    this.masterIp = masterIp;
+    this.configurationSerializer = configurationSerializer;
+    this.fileNames = fileNames;
+    this.classpath = classpath;
+    this.jvmSlack = jvmSlack;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+    try {
+      final File jobFolder = new File(new File(this.rootFolderName),
+          "/" + jobSubmissionProto.getIdentifier() + "-" + System.currentTimeMillis() + "/");
+
+      final File driverFolder = new File(jobFolder, DRIVER_FOLDER_NAME);
+      driverFolder.mkdirs();
+
+      final File reefFolder = new File(driverFolder, this.fileNames.getREEFFolderName());
+      reefFolder.mkdirs();
+
+      final File localFolder = new File(reefFolder, this.fileNames.getLocalFolderName());
+      localFolder.mkdirs();
+      for (final FileResourceProto file : jobSubmissionProto.getLocalFileList()) {
+        final Path src = new File(file.getPath()).toPath();
+        final Path dst = new File(driverFolder, this.fileNames.getLocalFolderPath() + "/" + file.getName()).toPath();
+        Files.copy(src, dst);
+      }
+
+      final File globalFolder = new File(reefFolder, this.fileNames.getGlobalFolderName());
+      globalFolder.mkdirs();
+      for (final FileResourceProto file : jobSubmissionProto.getGlobalFileList()) {
+        final Path src = new File(file.getPath()).toPath();
+        final Path dst = new File(driverFolder, this.fileNames.getGlobalFolderPath() + "/" + file.getName()).toPath();
+        Files.copy(src, dst);
+      }
+
+      final Configuration driverConfiguration =
+          Configurations.merge(MesosDriverConfiguration.CONF
+              .set(MesosDriverConfiguration.MESOS_MASTER_IP, this.masterIp)
+              .set(MesosDriverConfiguration.JOB_IDENTIFIER, jobSubmissionProto.getIdentifier())
+              .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionProto.getRemoteId())
+              .set(MesosDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+              .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1) // must be 1 as there is 1 scheduler at the same time
+              .build(),
+          this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
+      final File runtimeConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
+      this.configurationSerializer.toFile(driverConfiguration, runtimeConfigurationFile);
+
+      final List<String> launchCommand = new JavaLaunchCommandBuilder()
+          .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
+          .setLaunchID(jobSubmissionProto.getIdentifier())
+          .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+          .setClassPath(this.classpath.getDriverClasspath())
+          .setMemory(jobSubmissionProto.getDriverMemory())
+          .build();
+
+      final File errFile = new File(driverFolder, fileNames.getDriverStderrFileName());
+      final File outFile = new File(driverFolder, fileNames.getDriverStdoutFileName());
+
+      new ProcessBuilder()
+          .command(launchCommand)
+          .directory(driverFolder)
+          .redirectError(errFile)
+          .redirectOutput(outFile)
+          .start();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
new file mode 100644
index 0000000..09c2882
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The ip address of Mesos Master", short_name = "master_ip")
+public final class MasterIp implements Name<String> {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
new file mode 100644
index 0000000..8cdc116
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The root folder where logs etc. will be stored.", default_value = "REEF_MESOS_RUNTIME")
+public final class RootFolder implements Name<String> {
+}
\ No newline at end of file