You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/22 22:06:16 UTC

[1/2] incubator-reef git commit: [REEF-30] Adding reef-runtime-mesos

Repository: incubator-reef
Updated Branches:
  refs/heads/master 4bc3282d9 -> c908a526d


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
new file mode 100644
index 0000000..9c2c6d9
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import com.google.protobuf.ByteString;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceReleaseProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceRequestProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto.Builder;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.proto.ReefServiceProtos.State;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.mesos.driver.parameters.MesosMasterIp;
+import org.apache.reef.runtime.mesos.evaluator.REEFExecutor;
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.runtime.mesos.util.MesosRemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.Encoder;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.CommandInfo.URI;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Filters;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Protos.Value.Type;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import javax.inject.Inject;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * MesosScheduler that interacts with MesosMaster and MesosExecutors.
+ */
+final class REEFScheduler implements Scheduler {
+  private static final Logger LOG = Logger.getLogger(REEFScheduler.class.getName());
+  private static final String REEF_TAR = "reef.tar.gz";
+  private static final String RUNTIME_NAME = "MESOS";
+  private static final int MESOS_SLAVE_PORT = 5051; //  Assumes for now that all slaves use port 5051(default) TODO: make it configurable.
+  private static final String REEF_JOB_NAME_PREFIX = "reef-job-";
+
+  private final String reefTarUri;
+  private final REEFFileNames fileNames;
+  private final ClasspathProvider classpath;
+
+  private final REEFEventHandlers reefEventHandlers;
+  private final MesosRemoteManager mesosRemoteManager;
+
+  private final SchedulerDriver mesosMaster;
+  private final EStage<SchedulerDriver> schedulerDriverEStage;
+  private final Map<String, Offer> offers = new ConcurrentHashMap<>();
+
+  private int outstandingRequestCounter = 0;
+  private final ConcurrentLinkedQueue<ResourceRequestProto> outstandingRequests = new ConcurrentLinkedQueue<>();
+  private final Map<String, ResourceRequestProto> executorIdToLaunchedRequests = new ConcurrentHashMap<>();
+  private final REEFExecutors executors;
+
+  @Inject
+  REEFScheduler(final REEFEventHandlers reefEventHandlers,
+                final MesosRemoteManager mesosRemoteManager,
+                final REEFExecutors executors,
+                final REEFFileNames fileNames,
+                final EStage<SchedulerDriver> schedulerDriverEStage,
+                final ClasspathProvider classpath,
+                final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier,
+                final @Parameter(MesosMasterIp.class) String masterIp) {
+    this.mesosRemoteManager = mesosRemoteManager;
+    this.reefEventHandlers = reefEventHandlers;
+    this.executors = executors;
+    this.fileNames = fileNames;
+    this.reefTarUri = getReefTarUri(jobIdentifier);
+    this.classpath = classpath;
+    this.schedulerDriverEStage = schedulerDriverEStage;
+
+    final Protos.FrameworkInfo frameworkInfo = Protos.FrameworkInfo.newBuilder()
+        .setUser("") // TODO: make it configurable.
+        .setName(REEF_JOB_NAME_PREFIX + jobIdentifier)
+        .build();
+    this.mesosMaster = new MesosSchedulerDriver(this, frameworkInfo, masterIp);
+  }
+
+  @Override
+  public void registered(final SchedulerDriver driver,
+                         final Protos.FrameworkID frameworkId,
+                         final Protos.MasterInfo masterInfo) {
+    LOG.log(Level.INFO, "Framework ID={0} registration succeeded", frameworkId);
+  }
+
+  @Override
+  public void reregistered(final SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
+    LOG.log(Level.INFO, "Framework reregistered, MasterInfo: {0}", masterInfo);
+  }
+
+  /**
+   * All offers in each batch of offers will be either be launched or declined
+   */
+  @Override
+  public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) {
+    final Map<String, NodeDescriptorProto.Builder> nodeDescriptorProtos = new HashMap<>();
+
+    for (final Offer offer : offers) {
+      if (nodeDescriptorProtos.get(offer.getSlaveId().getValue()) == null) {
+        nodeDescriptorProtos.put(offer.getSlaveId().getValue(), NodeDescriptorProto.newBuilder()
+            .setIdentifier(offer.getSlaveId().getValue())
+            .setHostName(offer.getHostname())
+            .setPort(MESOS_SLAVE_PORT)
+            .setMemorySize(getMemory(offer)));
+      } else {
+        final NodeDescriptorProto.Builder builder = nodeDescriptorProtos.get(offer.getSlaveId().getValue());
+        builder.setMemorySize(builder.getMemorySize() + getMemory(offer));
+      }
+
+      this.offers.put(offer.getId().getValue(), offer);
+    }
+
+    for (final NodeDescriptorProto.Builder ndpBuilder : nodeDescriptorProtos.values()) {
+      this.reefEventHandlers.onNodeDescriptor(ndpBuilder.build());
+    }
+
+    if (outstandingRequests.size() > 0) {
+      doResourceRequest(outstandingRequests.remove());
+    }
+  }
+
+  @Override
+  public void offerRescinded(final SchedulerDriver driver, final Protos.OfferID offerId) {
+    for (final String executorId : this.executorIdToLaunchedRequests.keySet()) {
+      if (executorId.startsWith(offerId.getValue())) {
+        this.outstandingRequests.add(this.executorIdToLaunchedRequests.remove(executorId));
+      }
+    }
+  }
+
+  @Override
+  public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus taskStatus) {
+    LOG.log(Level.SEVERE, "Task Status Update:", taskStatus.toString());
+
+    final DriverRuntimeProtocol.ResourceStatusProto.Builder resourceStatus =
+        DriverRuntimeProtocol.ResourceStatusProto.newBuilder().setIdentifier(taskStatus.getTaskId().getValue());
+
+    switch(taskStatus.getState()) {
+      case TASK_STARTING:
+        handleNewExecutor(taskStatus); // As there is only one Mesos Task per Mesos Executor, this is a new executor.
+        return;
+      case TASK_RUNNING:
+        resourceStatus.setState(State.RUNNING);
+        break;
+      case TASK_FINISHED:
+        if (taskStatus.getData().toStringUtf8().equals("eval_not_run")) { // TODO: a hack to pass closeEvaluator test, replace this with a better interface
+          return;
+        }
+        resourceStatus.setState(State.DONE);
+        break;
+      case TASK_KILLED:
+        resourceStatus.setState(State.KILLED);
+        break;
+      case TASK_LOST:
+      case TASK_FAILED:
+        resourceStatus.setState(State.FAILED);
+        break;
+      case TASK_STAGING:
+        throw new RuntimeException("TASK_STAGING should not be used for status update");
+      default:
+        throw new RuntimeException("Unknown TaskStatus");
+    }
+
+    if (taskStatus.getMessage() != null) {
+      resourceStatus.setDiagnostics(taskStatus.getMessage());
+    }
+
+    this.reefEventHandlers.onResourceStatus(resourceStatus.build());
+  }
+
+  @Override
+  public void frameworkMessage(final SchedulerDriver driver,
+                               final Protos.ExecutorID executorId,
+                               final Protos.SlaveID slaveId,
+                               final byte[] data) {
+    LOG.log(Level.INFO, "Framework Message. driver: {0} executorId: {1} slaveId: {2} data: {3}",
+        new Object[]{driver, executorId, slaveId, data});
+  }
+
+  @Override
+  public void disconnected(final SchedulerDriver driver) {
+    this.onRuntimeError(new RuntimeException("Scheduler disconnected from MesosMaster"));
+  }
+
+  @Override
+  public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID slaveId) {
+    LOG.log(Level.SEVERE, "Slave Lost. {0}", slaveId.getValue());
+  }
+
+  @Override
+  public void executorLost(final SchedulerDriver driver,
+                           final Protos.ExecutorID executorId,
+                           final Protos.SlaveID slaveId,
+                           final int status) {
+    final String diagnostics = "Executor Lost. executorid: "+executorId.getValue()+" slaveid: "+slaveId.getValue();
+    final DriverRuntimeProtocol.ResourceStatusProto resourceStatus =
+        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+            .setIdentifier(executorId.getValue())
+            .setState(State.FAILED)
+            .setExitCode(status)
+            .setDiagnostics(diagnostics)
+            .build();
+
+    this.reefEventHandlers.onResourceStatus(resourceStatus);
+  }
+
+  @Override
+  public void error(final SchedulerDriver driver, final String message) {
+    this.onRuntimeError(new RuntimeException(message));
+  }
+
+  /////////////////////////////////////////////////////////////////
+  // HELPER METHODS
+
+  public void onStart() {
+    this.schedulerDriverEStage.onNext(this.mesosMaster);
+  }
+
+  public void onStop() {
+    this.mesosMaster.stop();
+    try {
+      this.schedulerDriverEStage.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void onResourceRequest(final ResourceRequestProto resourceRequestProto) {
+    this.outstandingRequestCounter += resourceRequestProto.getResourceCount();
+    updateRuntimeStatus();
+    doResourceRequest(resourceRequestProto);
+  }
+
+  public void onResourceRelease(final ResourceReleaseProto resourceReleaseProto) {
+    this.executors.releaseEvaluator(new EvaluatorRelease(resourceReleaseProto.getIdentifier()));
+    this.executors.remove(resourceReleaseProto.getIdentifier());
+    updateRuntimeStatus();
+  }
+
+  /**
+   * Greedily acquire resources by launching a Mesos Task(w/ our custom MesosExecutor) on REEF Evaluator request.
+   * Either called from onResourceRequest(for a new request) or resourceOffers(for an outstanding request).
+   * TODO: reflect priority and rack/node locality specified in resourceRequestProto.
+   */
+  private synchronized void doResourceRequest(final ResourceRequestProto resourceRequestProto) {
+    int tasksToLaunchCounter = resourceRequestProto.getResourceCount();
+
+    for (final Offer offer : this.offers.values()) {
+      final int cpuSlots = getCpu(offer) / resourceRequestProto.getVirtualCores();
+      final int memSlots = getMemory(offer) / resourceRequestProto.getMemorySize();
+      final int taskNum = Math.min(Math.min(cpuSlots, memSlots), tasksToLaunchCounter);
+
+      if (taskNum > 0 && satisfySlaveConstraint(resourceRequestProto, offer)) {
+        final List<TaskInfo> tasksToLaunch = new ArrayList<>();
+        tasksToLaunchCounter -= taskNum;
+
+        // Launch as many MesosTasks on the same node(offer) as possible to exploit locality.
+        for (int j = 0; j < taskNum; j++) {
+          final String id = offer.getId().getValue() + "-" + String.valueOf(j);
+          final String executorLaunchCommand = getExecutorLaunchCommand(id, resourceRequestProto.getMemorySize());
+
+          final ExecutorInfo executorInfo = ExecutorInfo.newBuilder()
+              .setExecutorId(ExecutorID.newBuilder()
+                  .setValue(id)
+                  .build())
+              .setCommand(CommandInfo.newBuilder()
+                  .setValue(executorLaunchCommand)
+                  .addUris(URI.newBuilder().setValue(reefTarUri).build())
+                  .build())
+              .build();
+
+          final TaskInfo taskInfo = TaskInfo.newBuilder()
+              .setTaskId(TaskID.newBuilder()
+                  .setValue(id)
+                  .build())
+              .setName(id)
+              .setSlaveId(offer.getSlaveId())
+              .addResources(Resource.newBuilder()
+                  .setName("mem")
+                  .setType(Type.SCALAR)
+                  .setScalar(Value.Scalar.newBuilder()
+                      .setValue(resourceRequestProto.getMemorySize())
+                      .build())
+                  .build())
+              .addResources(Resource.newBuilder()
+                  .setName("cpus")
+                  .setType(Type.SCALAR)
+                  .setScalar(Value.Scalar.newBuilder()
+                      .setValue(resourceRequestProto.getVirtualCores())
+                      .build())
+                  .build())
+              .setExecutor(executorInfo)
+              .build();
+
+          tasksToLaunch.add(taskInfo);
+          this.executorIdToLaunchedRequests.put(id, resourceRequestProto);
+        }
+
+        final Filters filters = Filters.newBuilder().setRefuseSeconds(0).build();
+        mesosMaster.launchTasks(Collections.singleton(offer.getId()), tasksToLaunch, filters);
+      } else {
+        mesosMaster.declineOffer(offer.getId());
+      }
+    }
+
+    // the offers are no longer valid(all launched or declined)
+    this.offers.clear();
+
+    // Save leftovers that couldn't be launched
+    outstandingRequests.add(ResourceRequestProto.newBuilder()
+        .mergeFrom(resourceRequestProto)
+        .setResourceCount(tasksToLaunchCounter)
+        .build());
+  }
+
+  private void handleNewExecutor(final Protos.TaskStatus taskStatus) {
+    final ResourceRequestProto resourceRequestProto =
+        this.executorIdToLaunchedRequests.remove(taskStatus.getTaskId().getValue());
+
+    final EventHandler<EvaluatorControl> evaluatorControlHandler =
+        this.mesosRemoteManager.getHandler(taskStatus.getMessage(), EvaluatorControl.class);
+    this.executors.add(taskStatus.getTaskId().getValue(), resourceRequestProto.getMemorySize(), evaluatorControlHandler);
+
+    final ResourceAllocationProto alloc = DriverRuntimeProtocol.ResourceAllocationProto.newBuilder()
+        .setIdentifier(taskStatus.getTaskId().getValue())
+        .setNodeId(taskStatus.getSlaveId().getValue())
+        .setResourceMemory(resourceRequestProto.getMemorySize())
+        .build();
+    reefEventHandlers.onResourceAllocation(alloc);
+
+    this.outstandingRequestCounter--;
+    this.updateRuntimeStatus();
+  }
+
+  private synchronized void updateRuntimeStatus() {
+    final Builder builder = DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
+        .setName(RUNTIME_NAME)
+        .setState(State.RUNNING)
+        .setOutstandingContainerRequests(this.outstandingRequestCounter);
+
+    for (final String executorId : this.executors.getExecutorIds()) {
+      builder.addContainerAllocation(executorId);
+    }
+
+    this.reefEventHandlers.onRuntimeStatus(builder.build());
+  }
+
+  private void onRuntimeError(final Throwable throwable) {
+    this.mesosMaster.stop();
+    try {
+      this.schedulerDriverEStage.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    final Builder runtimeStatusBuilder = RuntimeStatusProto.newBuilder()
+        .setState(State.FAILED)
+        .setName(RUNTIME_NAME);
+
+    final Encoder<Throwable> codec = new ObjectSerializableCodec<>();
+    runtimeStatusBuilder.setError(ReefServiceProtos.RuntimeErrorProto.newBuilder()
+        .setName(RUNTIME_NAME)
+        .setMessage(throwable.getMessage())
+        .setException(ByteString.copyFrom(codec.encode(throwable)))
+        .build());
+
+    this.reefEventHandlers.onRuntimeStatus(runtimeStatusBuilder.build());
+  }
+
+  private boolean satisfySlaveConstraint(final ResourceRequestProto resourceRequestProto, final Offer offer) {
+    return resourceRequestProto.getNodeNameCount() == 0 ||
+        resourceRequestProto.getNodeNameList().contains(offer.getSlaveId().getValue());
+  }
+
+  private int getMemory(final Offer offer) {
+    for (final Resource resource : offer.getResourcesList()) {
+      switch (resource.getName()) {
+        case "mem":
+          return (int)resource.getScalar().getValue();
+      }
+    }
+    return 0;
+  }
+
+  private int getCpu(final Offer offer) {
+    for (final Resource resource : offer.getResourcesList()) {
+      switch (resource.getName()) {
+        case "cpus":
+          return (int)resource.getScalar().getValue();
+      }
+    }
+    return 0;
+  }
+
+  private String getExecutorLaunchCommand(final String executorID, final int memorySize) {
+    final String DEFAULT_JAVA_PATH = System.getenv("JAVA_HOME") + "/bin/" +  "java";
+    final String classPath = "-classpath " + StringUtils.join(this.classpath.getEvaluatorClasspath(), ":");
+    final String logging = "-Djava.util.logging.config.class=org.apache.reef.util.logging.Config";
+    final String mesosExecutorId = "-mesos_executor_id " + executorID;
+
+    return (new StringBuilder()
+        .append(DEFAULT_JAVA_PATH + " ")
+        .append("-XX:PermSize=128m" + " ")
+        .append("-XX:MaxPermSize=128m" + " ")
+        .append("-Xmx" + String.valueOf(memorySize) + "m" + " ")
+        .append(classPath + " ")
+        .append(logging + " ")
+        .append(REEFExecutor.class.getName() + " ")
+        .append(mesosExecutorId + " ")
+        .toString());
+  }
+
+  private String getReefTarUri(final String jobIdentifier) {
+    try {
+      // Create REEF_TAR
+      final FileOutputStream fileOutputStream = new FileOutputStream(REEF_TAR);
+      final TarArchiveOutputStream tarArchiveOutputStream =
+          new TarArchiveOutputStream(new GZIPOutputStream(fileOutputStream));
+      final File globalFolder = new File(this.fileNames.getGlobalFolderPath());
+      final DirectoryStream<Path> directoryStream = Files.newDirectoryStream(globalFolder.toPath());
+
+      for (final Path path : directoryStream) {
+        tarArchiveOutputStream.putArchiveEntry(new TarArchiveEntry(path.toFile(),
+            globalFolder + "/" + path.getFileName()));
+
+        final BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(path.toFile()));
+        IOUtils.copy(bufferedInputStream, tarArchiveOutputStream);
+        bufferedInputStream.close();
+
+        tarArchiveOutputStream.closeArchiveEntry();
+      }
+      directoryStream.close();
+      tarArchiveOutputStream.close();
+      fileOutputStream.close();
+
+      // Upload REEF_TAR to HDFS
+      final FileSystem fileSystem = FileSystem.get(new Configuration());
+      final org.apache.hadoop.fs.Path src = new org.apache.hadoop.fs.Path(REEF_TAR);
+      final String reefTarUri = fileSystem.getUri().toString() + "/" + jobIdentifier + "/" + REEF_TAR;
+      final org.apache.hadoop.fs.Path dst = new org.apache.hadoop.fs.Path(reefTarUri);
+      fileSystem.copyFromLocalFile(src, dst);
+
+      return reefTarUri;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/parameters/MesosMasterIp.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/parameters/MesosMasterIp.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/parameters/MesosMasterIp.java
new file mode 100644
index 0000000..863f5b3
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/parameters/MesosMasterIp.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The ip address of Mesos Master")
+public final class MesosMasterIp implements Name<String> {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/EvaluatorControlHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/EvaluatorControlHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/EvaluatorControlHandler.java
new file mode 100644
index 0000000..20287e1
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/EvaluatorControlHandler.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.evaluator;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+
+/**
+ * Handles evaluator launch requests via MesosRemoteManager from MesosResourceLaunchHandler
+ */
+@EvaluatorSide
+@Private
+final class EvaluatorControlHandler implements EventHandler<RemoteMessage<EvaluatorControl>> {
+  // EvaluatorLaunchHandler is registered in MesosExecutor. Hence, we need an InjectionFuture here.
+  private final InjectionFuture<REEFExecutor> mesosExecutor;
+
+  @Inject
+  EvaluatorControlHandler(final InjectionFuture<REEFExecutor> mesosExecutor) {
+    this.mesosExecutor = mesosExecutor;
+  }
+
+  @Override
+  public void onNext(final RemoteMessage<EvaluatorControl> remoteMessage) {
+    final EvaluatorControl evaluatorControl = remoteMessage.getMessage();
+    if (evaluatorControl.getEvaluatorLaunch() != null) {
+      this.mesosExecutor.get().onEvaluatorLaunch(evaluatorControl.getEvaluatorLaunch());
+    } else if (evaluatorControl.getEvaluatorRelease() != null) {
+      this.mesosExecutor.get().onEvaluatorRelease(evaluatorControl.getEvaluatorRelease());
+    } else {
+      throw new IllegalArgumentException();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/REEFExecutor.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/REEFExecutor.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/REEFExecutor.java
new file mode 100644
index 0000000..7225f92
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/REEFExecutor.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.evaluator;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.mesos.evaluator.parameters.MesosExecutorId;
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.runtime.mesos.util.MesosRemoteManager;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.mesos.Executor;
+import org.apache.mesos.ExecutorDriver;
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.SlaveInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class REEFExecutor implements Executor {
+  private final static Logger LOG = Logger.getLogger(REEFExecutor.class.getName());
+
+  private final MesosExecutorDriver mesosExecutorDriver;
+  private final MesosRemoteManager mesosRemoteManager;
+  private final ExecutorService executorService;
+  private final REEFFileNames fileNames;
+  private final String mesosExecutorId;
+
+  private Process evaluatorProcess;
+  private Integer evaluatorProcessExitValue;
+
+  @Inject
+  REEFExecutor(final EvaluatorControlHandler evaluatorControlHandler,
+               final MesosRemoteManager mesosRemoteManager,
+               final REEFFileNames fileNames,
+               final @Parameter(MesosExecutorId.class) String mesosExecutorId) {
+    this.mesosRemoteManager = mesosRemoteManager;
+    this.mesosRemoteManager.registerHandler(EvaluatorControl.class, evaluatorControlHandler);
+    this.mesosExecutorDriver = new MesosExecutorDriver(this);
+    this.executorService = Executors.newCachedThreadPool();
+    this.fileNames = fileNames;
+    this.mesosExecutorId = mesosExecutorId;
+  }
+
+  @Override
+  public final void registered(final ExecutorDriver driver,
+                         final ExecutorInfo executorInfo,
+                         final FrameworkInfo frameworkInfo,
+                         final SlaveInfo slaveInfo) {
+    LOG.log(Level.FINEST, "Executor registered. driver: {0} executorInfo: {1} frameworkInfo: {2} slaveInfo {3}",
+        new Object[]{driver, executorInfo, frameworkInfo, slaveInfo});
+  }
+
+  @Override
+  public final void reregistered(final ExecutorDriver driver, final SlaveInfo slaveInfo) {
+    LOG.log(Level.FINEST, "Executor reregistered. driver: {0}", driver);
+  }
+
+  @Override
+  public final void disconnected(final ExecutorDriver driver) {
+    this.onRuntimeError();
+  }
+
+  /**
+   * We assume a long-running Mesos Task that manages a REEF Evaluator process, leveraging Mesos Executor's interface.
+   */
+  @Override
+  public final void launchTask(final ExecutorDriver driver, final TaskInfo task) {
+    driver.sendStatusUpdate(TaskStatus.newBuilder()
+        .setTaskId(TaskID.newBuilder().setValue(this.mesosExecutorId).build())
+        .setState(TaskState.TASK_STARTING)
+        .setSlaveId(task.getSlaveId())
+        .setMessage(this.mesosRemoteManager.getMyIdentifier())
+        .build());
+  }
+
+  @Override
+  public final void killTask(final ExecutorDriver driver, final TaskID taskId) {
+    this.onStop();
+  }
+
+  @Override
+  public final void frameworkMessage(final ExecutorDriver driver, final byte[] data) {
+    LOG.log(Level.FINEST, "Framework Messge. ExecutorDriver: {0}, data: {1}.",
+        new Object[]{driver, data});
+  }
+
+  @Override
+  public final void shutdown(final ExecutorDriver driver) {
+    this.onStop();
+  }
+
+  @Override
+  public final void error(final ExecutorDriver driver, final String message) {
+    this.onRuntimeError();
+  }
+
+  /////////////////////////////////////////////////////////////////
+  // HELPER METHODS
+
+  private void onStart() {
+    this.executorService.submit(new Thread() { public void run() {
+      final Status status;
+      status = mesosExecutorDriver.run();
+      LOG.log(Level.INFO, "MesosExecutorDriver ended with status {0}", status);
+    }});
+  }
+
+  private void onStop() {
+    // Shutdown REEF Evaluator
+    if (this.evaluatorProcess != null) {
+      this.evaluatorProcess.destroy();
+      mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder()
+          .setTaskId(TaskID.newBuilder()
+              .setValue(mesosExecutorId)
+              .build())
+          .setState(TaskState.TASK_FINISHED)
+          .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue))
+          .build());
+    } else {
+      mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder()
+          .setTaskId(TaskID.newBuilder()
+              .setValue(mesosExecutorId)
+              .build())
+          .setState(TaskState.TASK_FINISHED)
+          .setData(ByteString.copyFromUtf8("eval_not_run")) // TODO: a hack to pass closeEvaluator test, replace this with a better interface
+          .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue))
+          .build());
+    }
+
+    // Shutdown Mesos Executor
+    this.executorService.shutdown();
+    this.mesosExecutorDriver.stop();
+  }
+
+  private void onRuntimeError() {
+    // Shutdown REEF Evaluator
+    if (this.evaluatorProcess != null) {
+      this.evaluatorProcess.destroy();
+    }
+    mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder()
+        .setTaskId(TaskID.newBuilder()
+            .setValue(mesosExecutorId)
+            .build())
+        .setState(TaskState.TASK_FAILED)
+        .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue))
+        .build());
+
+    // Shutdown Mesos Executor
+    this.executorService.shutdown();
+    this.mesosExecutorDriver.stop();
+  }
+
+  public final void onEvaluatorRelease(final EvaluatorRelease evaluatorRelease) {
+    LOG.log(Level.INFO, "Release!!!! {0}", evaluatorRelease.toString());
+    assert(evaluatorRelease.getIdentifier().toString().equals(this.mesosExecutorId));
+    this.onStop();
+  }
+
+  public final void onEvaluatorLaunch(final EvaluatorLaunch evaluatorLaunch) {
+    LOG.log(Level.INFO, "Launch!!!! {0}", evaluatorLaunch.toString());
+    assert(evaluatorLaunch.getIdentifier().toString().equals(this.mesosExecutorId));
+    final ExecutorService evaluatorLaunchExecutorService = Executors.newSingleThreadExecutor();
+    evaluatorLaunchExecutorService.submit(new Thread() {
+      public void run() {
+        try {
+          final List<String> command = Arrays.asList(evaluatorLaunch.getCommand().toString().split(" "));
+          LOG.log(Level.INFO, "Command!!!! {0}", command);
+          final FileSystem fileSystem = FileSystem.get(new Configuration());
+          final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + mesosExecutorId);
+          final File localFolder = new File(fileNames.getREEFFolderName(), fileNames.getLocalFolderName());
+
+          FileUtil.copy(fileSystem, hdfsFolder, localFolder, true, new Configuration());
+
+          evaluatorProcess = new ProcessBuilder()
+              .command(command)
+              .redirectError(new File(fileNames.getEvaluatorStderrFileName()))
+              .redirectOutput(new File(fileNames.getEvaluatorStdoutFileName()))
+              .start();
+
+          evaluatorProcessExitValue = evaluatorProcess.waitFor();
+
+          fileSystem.close();
+        } catch (IOException | InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+    evaluatorLaunchExecutorService.shutdown();
+  }
+
+  public static org.apache.reef.tang.Configuration parseCommandLine(final String[] args) throws IOException {
+    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+
+    new CommandLine(confBuilder)
+        .registerShortNameOfClass(MesosExecutorId.class)
+        .processCommandLine(args);
+
+    return confBuilder.build();
+  }
+
+  /**
+   * The starting point of the executor.
+   */
+  public static void main(final String[] args) throws Exception {
+    final Injector injector = Tang.Factory.getTang().newInjector(parseCommandLine(args));
+    final REEFExecutor REEFExecutor = injector.getInstance(REEFExecutor.class);
+    REEFExecutor.onStart();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/parameters/MesosExecutorId.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/parameters/MesosExecutorId.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/parameters/MesosExecutorId.java
new file mode 100644
index 0000000..0f2044c
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/parameters/MesosExecutorId.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The Executor's id", short_name = "mesos_executor_id")
+public final class MesosExecutorId implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/HDFSConfigurationConstructor.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/HDFSConfigurationConstructor.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/HDFSConfigurationConstructor.java
new file mode 100644
index 0000000..f028274
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/HDFSConfigurationConstructor.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.reef.tang.ExternalConstructor;
+
+import javax.inject.Inject;
+
+public final class HDFSConfigurationConstructor implements ExternalConstructor<Configuration> {
+  @Inject
+  HDFSConfigurationConstructor() {
+  }
+
+  @Override
+  public Configuration newInstance() {
+    return new Configuration();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosErrorHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosErrorHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosErrorHandler.java
new file mode 100644
index 0000000..19ec56a
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosErrorHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.util;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * ErrorHandler for MesosRemoteManager.
+ * TODO: Replace this class once Tang's namespace feature is enabled
+ */
+public final class MesosErrorHandler implements EventHandler<Throwable> {
+
+  private static final Logger LOG = Logger.getLogger(MesosErrorHandler.class.getName());
+
+  @Inject
+  MesosErrorHandler() {
+  }
+
+  @Override
+  public void onNext(final Throwable e) {
+    LOG.log(Level.SEVERE, "MesosRemoteManager Error", e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
new file mode 100644
index 0000000..b32475d
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.util;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteIdentifierFactory;
+import org.apache.reef.wake.remote.RemoteManager;
+import org.apache.reef.wake.remote.RemoteMessage;
+import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.remote.impl.StringCodec;
+
+import javax.inject.Inject;
+
+/**
+ * Since the existing RemoteManager cannot use an additional codec,
+ * we need this additional RemoteManager to use MesosMessageCodec.
+ * TODO: Replace this class once Tang's namespace feature is enabled
+ */
+public final class MesosRemoteManager {
+  private final RemoteManager raw;
+  private final RemoteIdentifierFactory factory;
+
+  @Inject
+  MesosRemoteManager(final RemoteIdentifierFactory factory,
+                     final MesosErrorHandler mesosErrorHandler,
+                     final MesosRemoteManagerCodec codec) {
+    this.factory = factory;
+    this.raw = new DefaultRemoteManagerImplementation("MESOS_EXECUTOR", "##UNKNOWN##", 0,
+        codec, mesosErrorHandler, false, 3, 10000);
+  }
+
+  public <T> EventHandler<T> getHandler(
+      final String destinationIdentifier, final Class<? extends T> messageType) {
+    return this.raw.getHandler(factory.getNewInstance(destinationIdentifier), messageType);
+  }
+
+  public <T, U extends T> AutoCloseable registerHandler(
+      final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) {
+    return this.raw.registerHandler(messageType, theHandler);
+  }
+
+  public String getMyIdentifier() {
+    return this.raw.getMyIdentifier().toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManagerCodec.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManagerCodec.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManagerCodec.java
new file mode 100644
index 0000000..aa4c853
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManagerCodec.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.util;
+
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class MesosRemoteManagerCodec implements Codec<EvaluatorControl> {
+  private static final Logger LOG = Logger.getLogger(MesosRemoteManagerCodec.class.getName());
+
+  @Inject
+  public MesosRemoteManagerCodec() {
+  }
+
+ @Override
+  public byte[] encode(final EvaluatorControl evaluatorControl) {
+    try {
+      LOG.log(Level.INFO, "Before Eecoding: {0}", evaluatorControl.toString());
+      final ByteArrayOutputStream out = new ByteArrayOutputStream();
+      final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+      final DatumWriter<EvaluatorControl> writer = new SpecificDatumWriter<>(EvaluatorControl.getClassSchema());
+      writer.write(evaluatorControl, encoder);
+      encoder.flush();
+      out.close();
+      LOG.log(Level.INFO, "After Encoding");
+      return out.toByteArray();
+    } catch (final IOException ex) {
+      throw new RemoteRuntimeException(ex);
+    }
+  }
+
+  @Override
+  public EvaluatorControl decode(final byte[] buf) {
+    try {
+      LOG.log(Level.INFO, "Before Decoding: {0}", buf);
+      final SpecificDatumReader<EvaluatorControl> reader = new SpecificDatumReader<>(EvaluatorControl.getClassSchema());
+      final Decoder decoder = DecoderFactory.get().binaryDecoder(buf, null);
+      LOG.log(Level.INFO, "After Decoding");
+      return reader.read(null, decoder);
+    } catch (final IOException ex) {
+      throw new RemoteRuntimeException(ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-tests/pom.xml
----------------------------------------------------------------------
diff --git a/reef-tests/pom.xml b/reef-tests/pom.xml
index 5481772..ee21633 100644
--- a/reef-tests/pom.xml
+++ b/reef-tests/pom.xml
@@ -50,6 +50,11 @@ under the License.
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
+            <artifactId>reef-runtime-mesos</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
             <artifactId>reef-poison</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-tests/src/test/java/org/apache/reef/tests/MesosTestEnvironment.java
----------------------------------------------------------------------
diff --git a/reef-tests/src/test/java/org/apache/reef/tests/MesosTestEnvironment.java b/reef-tests/src/test/java/org/apache/reef/tests/MesosTestEnvironment.java
new file mode 100644
index 0000000..557c8b8
--- /dev/null
+++ b/reef-tests/src/test/java/org/apache/reef/tests/MesosTestEnvironment.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests;
+
+import org.apache.reef.runtime.mesos.client.MesosClientConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+
+/**
+ * A TestEnvironment for the Mesos resourcemanager.
+ */
+public final class MesosTestEnvironment extends TestEnvironmentBase implements TestEnvironment {
+
+  // Used to make sure the tests call the methods in the right order.
+  private boolean ready = false;
+
+  @Override
+  public synchronized final void setUp() {
+    this.ready = true;
+  }
+
+  @Override
+  public synchronized final Configuration getRuntimeConfiguration() {
+    assert (this.ready);
+    try {
+      if (System.getenv("REEF_TEST_MESOS_MASTER_IP").equals("")) {
+        throw new RuntimeException("REEF_TEST_MESOS_MASTER_IP unspecified");
+      }
+
+      final String masterIp = System.getenv("REEF_TEST_MESOS_MASTER_IP");
+      return MesosClientConfiguration.CONF
+          .set(MesosClientConfiguration.MASTER_IP, masterIp)
+          .build();
+    } catch (final BindException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public synchronized final void tearDown() {
+    assert (this.ready);
+    this.ready = false;
+  }
+
+  @Override
+  public int getTestTimeout() {
+    return 300000; // 5 minutes
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java b/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java
index dd367e5..58f2072 100644
--- a/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java
+++ b/reef-tests/src/test/java/org/apache/reef/tests/TestEnvironmentFactory.java
@@ -36,7 +36,19 @@ public final class TestEnvironmentFactory {
    */
   public static TestEnvironment getNewTestEnvironment() {
     final boolean isYarn = Boolean.parseBoolean(System.getenv("REEF_TEST_YARN"));
-    LOG.log(Level.INFO, "Running tests on YARN: {0}", isYarn);
-    return isYarn ? new YarnTestEnvironment() : new LocalTestEnvironment();
+    final boolean isMesos = Boolean.parseBoolean(System.getenv("REEF_TEST_MESOS"));
+
+    if (isYarn && isMesos) {
+      throw new RuntimeException("Cannot test on two runtimes at once");
+    } else if (isYarn) {
+      LOG.log(Level.INFO, "Running tests on YARN");
+      return new YarnTestEnvironment();
+    } else if (isMesos) {
+      LOG.log(Level.INFO, "Running tests on Mesos");
+      return new MesosTestEnvironment();
+    } else {
+      LOG.log(Level.INFO, "Running tests on Local");
+      return new LocalTestEnvironment();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-tests/src/test/java/org/apache/reef/tests/YarnTestEnvironment.java
----------------------------------------------------------------------
diff --git a/reef-tests/src/test/java/org/apache/reef/tests/YarnTestEnvironment.java b/reef-tests/src/test/java/org/apache/reef/tests/YarnTestEnvironment.java
index c012ef2..409f091 100644
--- a/reef-tests/src/test/java/org/apache/reef/tests/YarnTestEnvironment.java
+++ b/reef-tests/src/test/java/org/apache/reef/tests/YarnTestEnvironment.java
@@ -23,7 +23,7 @@ import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.exceptions.BindException;
 
 /**
- * A TestEnvironment for the local resourcemanager.
+ * A TestEnvironment for the YARN resourcemanager.
  */
 public final class YarnTestEnvironment extends TestEnvironmentBase implements TestEnvironment {
 


[2/2] incubator-reef git commit: [REEF-30] Adding reef-runtime-mesos

Posted by we...@apache.org.
[REEF-30] Adding reef-runtime-mesos

This adds reef-runtime-mesos. Please refer to the pull request for details:

https://github.com/apache/incubator-reef/pull/52

JIRA: [REEF-30]: https://issues.apache.org/jira/browse/REEF-30

Pull Request: This closes #52


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c908a526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c908a526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c908a526

Branch: refs/heads/master
Commit: c908a526d8ad14ab9a35be351a89160fe6576992
Parents: 4bc3282
Author: John Yang <jo...@gmail.com>
Authored: Mon Jan 12 16:15:53 2015 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Jan 22 13:00:18 2015 -0800

----------------------------------------------------------------------
 .gitignore                                      |   3 +-
 bin/runmesostests.sh                            |  34 ++
 pom.xml                                         |  23 +
 reef-examples/pom.xml                           |   5 +
 .../reef/examples/hello/HelloREEFMesos.java     |  55 ++
 reef-runtime-mesos/pom.xml                      |  95 ++++
 .../src/main/avro/EvaluatorControl.avsc         |  47 ++
 .../runtime/mesos/MesosClasspathProvider.java   |  92 ++++
 .../mesos/client/MesosClientConfiguration.java  |  68 +++
 .../mesos/client/MesosJobSubmissionHandler.java | 141 ++++++
 .../mesos/client/parameters/MasterIp.java       |  26 +
 .../mesos/client/parameters/RootFolder.java     |  26 +
 .../mesos/driver/MesosDriverConfiguration.java  |  98 ++++
 .../driver/MesosResourceLaunchHandler.java      | 129 +++++
 .../driver/MesosResourceReleaseHandler.java     |  42 ++
 .../driver/MesosResourceRequestHandler.java     |  42 ++
 .../mesos/driver/MesosRuntimeStartHandler.java  |  38 ++
 .../mesos/driver/MesosRuntimeStopHandler.java   |  38 ++
 .../driver/MesosSchedulerDriverExecutor.java    |  42 ++
 .../runtime/mesos/driver/REEFEventHandlers.java |  65 +++
 .../reef/runtime/mesos/driver/REEFExecutor.java |  50 ++
 .../runtime/mesos/driver/REEFExecutors.java     |  64 +++
 .../runtime/mesos/driver/REEFScheduler.java     | 506 +++++++++++++++++++
 .../mesos/driver/parameters/MesosMasterIp.java  |  26 +
 .../evaluator/EvaluatorControlHandler.java      |  55 ++
 .../runtime/mesos/evaluator/REEFExecutor.java   | 249 +++++++++
 .../evaluator/parameters/MesosExecutorId.java   |  26 +
 .../util/HDFSConfigurationConstructor.java      |  35 ++
 .../runtime/mesos/util/MesosErrorHandler.java   |  43 ++
 .../runtime/mesos/util/MesosRemoteManager.java  |  62 +++
 .../mesos/util/MesosRemoteManagerCodec.java     |  68 +++
 reef-tests/pom.xml                              |   5 +
 .../apache/reef/tests/MesosTestEnvironment.java |  68 +++
 .../reef/tests/TestEnvironmentFactory.java      |  16 +-
 .../apache/reef/tests/YarnTestEnvironment.java  |   2 +-
 35 files changed, 2380 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index ea81d9e..2556de8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,4 +18,5 @@ nb-configuration.xml
 .idea
 atlassian-ide-plugin.xml
 REEF_LOCAL_RUNTIME
-profile-*.json
\ No newline at end of file
+REEF_MESOS_RUNTIME
+profile-*.json

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/bin/runmesostests.sh
----------------------------------------------------------------------
diff --git a/bin/runmesostests.sh b/bin/runmesostests.sh
new file mode 100755
index 0000000..350995d
--- /dev/null
+++ b/bin/runmesostests.sh
@@ -0,0 +1,34 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ "$#" -ne 1 ]; then
+    echo "Please specify REEF_TEST_MESOS_MASTER_IP as an argument"
+    exit 1
+fi
+
+export REEF_TEST_MESOS=true
+export REEF_TEST_MESOS_MASTER_IP=$1
+
+DEPENDENCY_JAR=`echo $REEF_HOME/reef-tests/target/reef-tests-*-test-jar-with-dependencies.jar`
+CLASSPATH=`hadoop classpath`
+
+CMD="java -cp $DEPENDENCY_JAR:$CLASSPATH org.junit.runner.JUnitCore org.apache.reef.tests.AllTestsSuite"
+echo $CMD
+$CMD

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8ed7eed..cdbc73e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -357,6 +357,22 @@ under the License.
             </dependency>
             <dependency>
                 <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-client</artifactId>
+                <version>${hadoop.version}</version>
+                <scope>provided</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-jcl</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-yarn-common</artifactId>
                 <version>${hadoop.version}</version>
                 <scope>provided</scope>
@@ -571,6 +587,12 @@ under the License.
                 <version>${jackson.version}</version>
             </dependency>
             <!-- End of Jackson -->
+
+            <dependency>
+                <groupId>org.apache.mesos</groupId>
+                <artifactId>mesos</artifactId>
+                <version>0.21.0</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -587,6 +609,7 @@ under the License.
         <module>reef-runtime-hdinsight</module>
         <module>reef-runtime-local</module>
         <module>reef-runtime-yarn</module>
+        <module>reef-runtime-mesos</module>
         <module>reef-tang</module>
         <module>reef-tests</module>
         <module>reef-wake</module>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-examples/pom.xml
----------------------------------------------------------------------
diff --git a/reef-examples/pom.xml b/reef-examples/pom.xml
index 66eeb34..b2eb3b7 100644
--- a/reef-examples/pom.xml
+++ b/reef-examples/pom.xml
@@ -48,6 +48,11 @@ under the License.
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
+            <artifactId>reef-runtime-mesos</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
             <artifactId>reef-io</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java
new file mode 100644
index 0000000..b3b962b
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.mesos.client.MesosClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class HelloREEFMesos {
+  private static final Logger LOG = Logger.getLogger(HelloREEFMesos.class.getName());
+
+  private static Configuration getDriverConfiguration() {
+    return DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, HelloREEFMesos.class.getProtectionDomain().getCodeSource().getLocation().getFile())
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+        .build();
+  }
+
+  /**
+   * MASTER_IP(Mesos Master IP) is set to "localhost:5050".
+   * You may change it to suit your cluster environment.
+   */
+  public static void main(final String[] args) throws InjectionException {
+    final LauncherStatus status = DriverLauncher
+        .getLauncher(MesosClientConfiguration.CONF
+            .set(MesosClientConfiguration.MASTER_IP, "localhost:5050")
+            .build())
+        .run(getDriverConfiguration());
+    LOG.log(Level.INFO, "REEF job completed: {0}", status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/pom.xml b/reef-runtime-mesos/pom.xml
new file mode 100644
index 0000000..a5aa333
--- /dev/null
+++ b/reef-runtime-mesos/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.reef</groupId>
+        <artifactId>reef-project</artifactId>
+        <version>0.11.0-incubating-SNAPSHOT</version>
+    </parent>
+    <name>REEF Runtime for Mesos</name>
+    <artifactId>reef-runtime-mesos</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mesos</groupId>
+            <artifactId>mesos</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.avro</groupId>
+                <artifactId>avro-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>schema</goal>
+                        </goals>
+                        <configuration>
+                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+                            <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/avro</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc b/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
new file mode 100644
index 0000000..ef4a152
--- /dev/null
+++ b/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+  {
+    "namespace": "org.apache.reef.runtime.mesos.util",
+    "type": "record",
+    "name": "EvaluatorLaunch",
+    "fields": [
+      { "name": "identifier", "type": "string" },
+      { "name": "command", "type": "string" }
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.runtime.mesos.util",
+    "type": "record",
+    "name": "EvaluatorRelease",
+    "fields": [
+      { "name": "identifier", "type": "string" }
+    ]
+  },
+  {
+    "namespace": "org.apache.reef.runtime.mesos.util",
+    "type": "record",
+    "name": "EvaluatorControl",
+    "fields": [
+      { "name": "evaluator_launch", "type": ["EvaluatorLaunch", "null"] },
+      { "name": "evaluator_release", "type": ["EvaluatorRelease", "null"] }
+    ]
+  }
+]
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
new file mode 100644
index 0000000..cc051d5
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos;
+
+import net.jcip.annotations.Immutable;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * Access to the classpath according to the REEF file system standard.
+ */
+@Immutable
+public final class MesosClasspathProvider implements RuntimeClasspathProvider {
+  private static final String HADOOP_CONF_DIR = System.getenv("HADOOP_CONF_DIR");
+  private static final String HADOOP_HOME = System.getenv("HADOOP_HOME");
+  private static final String HADOOP_COMMON_HOME = System.getenv("HADOOP_COMMON_HOME");
+  private static final String HADOOP_YARN_HOME = System.getenv("HADOOP_YARN_HOME");
+  private static final String HADOOP_HDFS_HOME = System.getenv("HADOOP_HDFS_HOME");
+  private static final String HADOOP_MAPRED_HOME = System.getenv("HADOOP_MAPRED_HOME");
+
+  // Used when we can't get a classpath from Hadoop
+  private static final String[] LEGACY_CLASSPATH_LIST = new String[]{
+      HADOOP_CONF_DIR,
+      HADOOP_HOME + "/*",
+      HADOOP_HOME + "/lib/*",
+      HADOOP_COMMON_HOME + "/*",
+      HADOOP_COMMON_HOME + "/lib/*",
+      HADOOP_YARN_HOME + "/*",
+      HADOOP_YARN_HOME + "/lib/*",
+      HADOOP_HDFS_HOME + "/*",
+      HADOOP_HDFS_HOME + "/lib/*",
+      HADOOP_MAPRED_HOME + "/*",
+      HADOOP_MAPRED_HOME + "/lib/*",
+      HADOOP_HOME + "/etc/hadoop",
+      HADOOP_HOME + "/share/hadoop/common/*",
+      HADOOP_HOME + "/share/hadoop/common/lib/*",
+      HADOOP_HOME + "/share/hadoop/yarn/*",
+      HADOOP_HOME + "/share/hadoop/yarn/lib/*",
+      HADOOP_HOME + "/share/hadoop/hdfs/*",
+      HADOOP_HOME + "/share/hadoop/hdfs/lib/*",
+      HADOOP_HOME + "/share/hadoop/mapreduce/*",
+      HADOOP_HOME + "/share/hadoop/mapreduce/lib/*"
+  };
+  private final List<String> classPathPrefix;
+  private final List<String> classPathSuffix;
+
+  @Inject
+  MesosClasspathProvider() {
+    this.classPathPrefix = Arrays.asList(LEGACY_CLASSPATH_LIST);
+    this.classPathSuffix = Arrays.asList(LEGACY_CLASSPATH_LIST);
+  }
+
+  @Override
+  public List<String> getDriverClasspathPrefix() {
+    return this.classPathPrefix;
+  }
+
+  @Override
+  public List<String> getDriverClasspathSuffix() {
+    return this.classPathSuffix;
+  }
+
+  @Override
+  public List<String> getEvaluatorClasspathPrefix() {
+    return this.classPathPrefix;
+  }
+
+  @Override
+  public List<String> getEvaluatorClasspathSuffix() {
+    return this.classPathSuffix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
new file mode 100644
index 0000000..280f2a6
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.client.REEF;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.runtime.common.client.REEFImplementation;
+import org.apache.reef.runtime.common.client.RunningJobImpl;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.mesos.MesosClasspathProvider;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
+import org.apache.reef.runtime.mesos.util.HDFSConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * A ConfigurationModule for the Mesos resource manager
+ */
+@Public
+@ClientSide
+public class MesosClientConfiguration extends ConfigurationModuleBuilder {
+  /**
+   * The folder in which the sub-folders for REEF drivers, one per job, will be created.
+   * If none is given, a folder "REEF_MESOS_RUNTIME" will be created in the local directory.
+   */
+  public static final OptionalParameter<String> ROOT_FOLDER = new OptionalParameter<>();
+
+  /**
+   * The ip address of Mesos Master
+   */
+  public static final RequiredParameter<String> MASTER_IP = new RequiredParameter<>();
+
+  public static final ConfigurationModule CONF = new MesosClientConfiguration()
+      .bindImplementation(REEF.class, REEFImplementation.class)
+      .bindImplementation(RunningJob.class, RunningJobImpl.class)
+      .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+      .bindImplementation(JobSubmissionHandler.class, MesosJobSubmissionHandler.class)
+      .bindNamedParameter(RootFolder.class, ROOT_FOLDER)
+      .bindNamedParameter(MasterIp.class, MASTER_IP)
+      .bindConstructor(Configuration.class, HDFSConfigurationConstructor.class)
+      .bindImplementation(RuntimeClasspathProvider.class, MesosClasspathProvider.class)
+      .build();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
new file mode 100644
index 0000000..d43a855
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos.FileResourceProto;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
+import org.apache.reef.runtime.mesos.driver.MesosDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+/**
+ * The current implementation runs the driver as a local process, similar to reef-runtime-local.
+ * TODO: run the driver on a slave node in the cluster
+ */
+@Private
+@ClientSide
+final class MesosJobSubmissionHandler implements JobSubmissionHandler {
+  public static final String DRIVER_FOLDER_NAME = "driver";
+
+  private final ConfigurationSerializer configurationSerializer;
+  private final ClasspathProvider classpath;
+  private final REEFFileNames fileNames;
+  private final String rootFolderName;
+  private final String masterIp;
+  private final double jvmSlack;
+
+  @Inject
+  MesosJobSubmissionHandler(final @Parameter(RootFolder.class) String rootFolderName,
+                            final @Parameter(MasterIp.class) String masterIp,
+                            final ConfigurationSerializer configurationSerializer,
+                            final REEFFileNames fileNames,
+                            final ClasspathProvider classpath,
+                            final @Parameter(JVMHeapSlack.class) double jvmSlack) {
+    this.rootFolderName = new File(rootFolderName).getAbsolutePath();
+    this.masterIp = masterIp;
+    this.configurationSerializer = configurationSerializer;
+    this.fileNames = fileNames;
+    this.classpath = classpath;
+    this.jvmSlack = jvmSlack;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+    try {
+      final File jobFolder = new File(new File(this.rootFolderName),
+          "/" + jobSubmissionProto.getIdentifier() + "-" + System.currentTimeMillis() + "/");
+
+      final File driverFolder = new File(jobFolder, DRIVER_FOLDER_NAME);
+      driverFolder.mkdirs();
+
+      final File reefFolder = new File(driverFolder, this.fileNames.getREEFFolderName());
+      reefFolder.mkdirs();
+
+      final File localFolder = new File(reefFolder, this.fileNames.getLocalFolderName());
+      localFolder.mkdirs();
+      for (final FileResourceProto file : jobSubmissionProto.getLocalFileList()) {
+        final Path src = new File(file.getPath()).toPath();
+        final Path dst = new File(driverFolder, this.fileNames.getLocalFolderPath() + "/" + file.getName()).toPath();
+        Files.copy(src, dst);
+      }
+
+      final File globalFolder = new File(reefFolder, this.fileNames.getGlobalFolderName());
+      globalFolder.mkdirs();
+      for (final FileResourceProto file : jobSubmissionProto.getGlobalFileList()) {
+        final Path src = new File(file.getPath()).toPath();
+        final Path dst = new File(driverFolder, this.fileNames.getGlobalFolderPath() + "/" + file.getName()).toPath();
+        Files.copy(src, dst);
+      }
+
+      final Configuration driverConfiguration =
+          Configurations.merge(MesosDriverConfiguration.CONF
+              .set(MesosDriverConfiguration.MESOS_MASTER_IP, this.masterIp)
+              .set(MesosDriverConfiguration.JOB_IDENTIFIER, jobSubmissionProto.getIdentifier())
+              .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionProto.getRemoteId())
+              .set(MesosDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+              .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1) // must be 1 as there is 1 scheduler at the same time
+              .build(),
+          this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
+      final File runtimeConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
+      this.configurationSerializer.toFile(driverConfiguration, runtimeConfigurationFile);
+
+      final List<String> launchCommand = new JavaLaunchCommandBuilder()
+          .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
+          .setLaunchID(jobSubmissionProto.getIdentifier())
+          .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+          .setClassPath(this.classpath.getDriverClasspath())
+          .setMemory(jobSubmissionProto.getDriverMemory())
+          .build();
+
+      final File errFile = new File(driverFolder, fileNames.getDriverStderrFileName());
+      final File outFile = new File(driverFolder, fileNames.getDriverStdoutFileName());
+
+      new ProcessBuilder()
+          .command(launchCommand)
+          .directory(driverFolder)
+          .redirectError(errFile)
+          .redirectOutput(outFile)
+          .start();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
new file mode 100644
index 0000000..09c2882
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The ip address of Mesos Master", short_name = "master_ip")
+public final class MasterIp implements Name<String> {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
new file mode 100644
index 0000000..8cdc116
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The root folder where logs etc. will be stored.", default_value = "REEF_MESOS_RUNTIME")
+public final class RootFolder implements Name<String> {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
new file mode 100644
index 0000000..2ef844d
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.mesos.MesosClasspathProvider;
+import org.apache.reef.runtime.mesos.driver.parameters.MesosMasterIp;
+import org.apache.reef.runtime.mesos.util.HDFSConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.StageConfiguration;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.time.Clock;
+
+/**
+ * Binds Driver's runtime event handlers
+ */
+public final class MesosDriverConfiguration extends ConfigurationModuleBuilder {
+  /**
+   * @see AbstractDriverRuntimeConfiguration.JobIdentifier.class
+   */
+  public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>();
+
+  /**
+   * @see AbstractDriverRuntimeConfiguration.EvaluatorTimeout
+   */
+  public static final OptionalParameter<Long> EVALUATOR_TIMEOUT = new OptionalParameter<>();
+
+  /**
+   * The ip address of Mesos Master
+   */
+  public static final RequiredParameter<String> MESOS_MASTER_IP = new RequiredParameter<>();
+
+  /**
+   * The client remote identifier.
+   */
+  public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>();
+
+  /**
+   * The fraction of the container memory NOT to use for the Java Heap.
+   */
+  public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
+
+  /**
+   * Capacity for runnning Mesos Scheduler Driver
+   */
+  public static final RequiredParameter<Integer> SCHEDULER_DRIVER_CAPACITY = new RequiredParameter<>();
+
+  public static ConfigurationModule CONF = new MesosDriverConfiguration()
+      .bindImplementation(ResourceLaunchHandler.class, MesosResourceLaunchHandler.class)
+      .bindImplementation(ResourceReleaseHandler.class, MesosResourceReleaseHandler.class)
+      .bindImplementation(ResourceRequestHandler.class, MesosResourceRequestHandler.class)
+      .bindSetEntry(Clock.RuntimeStartHandler.class, MesosRuntimeStartHandler.class)
+      .bindSetEntry(Clock.RuntimeStopHandler.class, MesosRuntimeStopHandler.class)
+      .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+
+      .bindNamedParameter(MesosMasterIp.class, MESOS_MASTER_IP)
+      .bindConstructor(Configuration.class, HDFSConfigurationConstructor.class)
+      .bindImplementation(RuntimeClasspathProvider.class, MesosClasspathProvider.class)
+
+      .bindNamedParameter(StageConfiguration.Capacity.class, SCHEDULER_DRIVER_CAPACITY)
+      .bindNamedParameter(StageConfiguration.StageHandler.class, MesosSchedulerDriverExecutor.class)
+      .bindImplementation(EStage.class, SingleThreadStage.class)
+
+          // Bind the fields bound in AbstractDriverRuntimeConfiguration
+      .bindNamedParameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class, JOB_IDENTIFIER)
+      .bindNamedParameter(AbstractDriverRuntimeConfiguration.EvaluatorTimeout.class, EVALUATOR_TIMEOUT)
+      .bindNamedParameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER)
+      .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+      .build();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
new file mode 100644
index 0000000..7ce98f8
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.JobJarMaker;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.LaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@DriverSide
+@Private
+final class MesosResourceLaunchHandler implements ResourceLaunchHandler {
+  private final ConfigurationSerializer configurationSerializer;
+  private final RemoteManager remoteManager;
+  private final REEFFileNames fileNames;
+  private final ClasspathProvider classpath;
+  private final double jvmHeapFactor;
+  private final REEFExecutors executors;
+  private static final Logger LOG = Logger.getLogger(MesosResourceLaunchHandler.class.getName());
+
+  @Inject
+  MesosResourceLaunchHandler(final ConfigurationSerializer configurationSerializer,
+                             final RemoteManager remoteManager,
+                             final REEFFileNames fileNames,
+                             final REEFExecutors executors,
+                             final ClasspathProvider classpath,
+                             final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) {
+    this.configurationSerializer = configurationSerializer;
+    this.remoteManager = remoteManager;
+    this.fileNames = fileNames;
+    this.executors = executors;
+    this.classpath = classpath;
+    this.jvmHeapFactor = 1.0 - jvmHeapSlack;
+  }
+
+
+  @Override
+  public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) {
+    try {
+      LOG.log(Level.INFO, "resourceLaunchProto. {0}", resourceLaunchProto.toString());
+
+      final File localStagingFolder =
+          Files.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix()).toFile();
+
+      final Configuration evaluatorConfiguration = Tang.Factory.getTang()
+          .newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf()))
+          .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+          .build();
+
+      final File configurationFile = new File(
+          localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
+      this.configurationSerializer.toFile(evaluatorConfiguration, configurationFile);
+
+      JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder);
+
+      final FileSystem fileSystem = FileSystem.get(new org.apache.hadoop.conf.Configuration());
+      final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + resourceLaunchProto.getIdentifier() + "/");
+      FileUtil.copy(localStagingFolder, fileSystem, hdfsFolder, false, new org.apache.hadoop.conf.Configuration());
+
+      // TODO: Replace REEFExecutor with a simple launch command (we only need to launch REEFExecutor)
+      final LaunchCommandBuilder commandBuilder;
+      switch (resourceLaunchProto.getType()) {
+        case JVM:
+          commandBuilder = new JavaLaunchCommandBuilder().setClassPath(this.classpath.getEvaluatorClasspath());
+          break;
+        case CLR:
+          commandBuilder = new CLRLaunchCommandBuilder();
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported container type");
+      }
+
+      final List<String> command = commandBuilder
+          .setErrorHandlerRID(this.remoteManager.getMyIdentifier())
+          .setLaunchID(resourceLaunchProto.getIdentifier())
+          .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath())
+          .setMemory((int) (this.jvmHeapFactor * this.executors.getMemory(resourceLaunchProto.getIdentifier())))
+          .build();
+
+      this.executors.launchEvaluator(
+          new EvaluatorLaunch(resourceLaunchProto.getIdentifier(), StringUtils.join(command, ' ')));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
new file mode 100644
index 0000000..41c487e
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+
+import javax.inject.Inject;
+
+@DriverSide
+@Private
+final class MesosResourceReleaseHandler implements ResourceReleaseHandler {
+  private final REEFScheduler REEFScheduler;
+
+  @Inject
+  MesosResourceReleaseHandler(final REEFScheduler REEFScheduler) {
+    this.REEFScheduler = REEFScheduler;
+  }
+
+  @Override
+  public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto resourceReleaseProto) {
+    REEFScheduler.onResourceRelease(resourceReleaseProto);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
new file mode 100644
index 0000000..a9c1016
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+
+import javax.inject.Inject;
+
+@DriverSide
+@Private
+final class MesosResourceRequestHandler implements ResourceRequestHandler {
+  private final REEFScheduler REEFScheduler;
+
+  @Inject
+  MesosResourceRequestHandler(final REEFScheduler REEFScheduler) {
+    this.REEFScheduler = REEFScheduler;
+  }
+
+  @Override
+  public void onNext(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
+    REEFScheduler.onResourceRequest(resourceRequestProto);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
new file mode 100644
index 0000000..c29e780
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+
+final class MesosRuntimeStartHandler implements EventHandler<RuntimeStart> {
+  private final REEFScheduler REEFScheduler;
+
+  @Inject
+  MesosRuntimeStartHandler(final REEFScheduler REEFScheduler) {
+    this.REEFScheduler = REEFScheduler;
+  }
+
+  @Override
+  public void onNext(final RuntimeStart runtimeStart){
+    this.REEFScheduler.onStart();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
new file mode 100644
index 0000000..3d3b86e
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+
+final class MesosRuntimeStopHandler implements EventHandler<RuntimeStop> {
+  private final REEFScheduler REEFScheduler;
+
+  @Inject
+  MesosRuntimeStopHandler(final REEFScheduler REEFScheduler) {
+    this.REEFScheduler = REEFScheduler;
+  }
+
+  @Override
+  public void onNext(final RuntimeStop runtimeStop) {
+    this.REEFScheduler.onStop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java
new file mode 100644
index 0000000..abde514
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class MesosSchedulerDriverExecutor implements EventHandler<SchedulerDriver> {
+  private static final Logger LOG = Logger.getLogger(MesosSchedulerDriverExecutor.class.getName());
+
+  @Inject
+  public MesosSchedulerDriverExecutor() {
+  }
+
+  @Override
+  public void onNext(final SchedulerDriver schedulerDriver) {
+    LOG.log(Level.INFO, "MesosMaster(SchedulerDriver) starting");
+    final Protos.Status status = schedulerDriver.run();
+    LOG.log(Level.INFO, "MesosMaster(SchedulerDriver) ended with status {0}", status);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
new file mode 100644
index 0000000..fd5cce2
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceStatusProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+@Private
+final class REEFEventHandlers {
+  private final EventHandler<ResourceAllocationProto> resourceAllocationEventHandler;
+  private final EventHandler<RuntimeStatusProto> runtimeStatusEventHandler;
+  private final EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler;
+  private final EventHandler<ResourceStatusProto> resourceStatusHandlerEventHandler;
+
+  @Inject
+  REEFEventHandlers(final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<ResourceAllocationProto> resourceAllocationEventHandler,
+                    final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<RuntimeStatusProto> runtimeStatusEventHandler,
+                    final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler,
+                    final @Parameter(RuntimeParameters.ResourceStatusHandler.class) EventHandler<ResourceStatusProto> resourceStatusHandlerEventHandler) {
+    this.resourceAllocationEventHandler = resourceAllocationEventHandler;
+    this.runtimeStatusEventHandler = runtimeStatusEventHandler;
+    this.nodeDescriptorEventHandler = nodeDescriptorEventHandler;
+    this.resourceStatusHandlerEventHandler = resourceStatusHandlerEventHandler;
+  }
+
+  void onNodeDescriptor(final NodeDescriptorProto nodeDescriptorProto) {
+    this.nodeDescriptorEventHandler.onNext(nodeDescriptorProto);
+  }
+
+  void onRuntimeStatus(final RuntimeStatusProto runtimeStatusProto) {
+    this.runtimeStatusEventHandler.onNext(runtimeStatusProto);
+  }
+
+  void onResourceAllocation(final ResourceAllocationProto resourceAllocationProto) {
+    this.resourceAllocationEventHandler.onNext(resourceAllocationProto);
+  }
+
+  void onResourceStatus(final ResourceStatusProto resourceStatusProto) {
+    this.resourceStatusHandlerEventHandler.onNext(resourceStatusProto);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java
new file mode 100644
index 0000000..be6045a
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * The Driver's view of a REEFExecutor running in the cluster.
+ */
+final class REEFExecutor {
+  private final int memory;
+  private final EventHandler<EvaluatorControl> evaluatorControlHandler;
+
+  REEFExecutor(final int memory,
+               final EventHandler<EvaluatorControl> evaluatorControlHandler) {
+    this.memory = memory;
+    this.evaluatorControlHandler = evaluatorControlHandler;
+  }
+
+  public void launchEvaluator(final EvaluatorLaunch evaluatorLaunch) {
+    this.evaluatorControlHandler.onNext(new EvaluatorControl(evaluatorLaunch, null));
+  }
+
+  public void releaseEvaluator(final EvaluatorRelease evaluatorRelease) {
+    this.evaluatorControlHandler.onNext(new EvaluatorControl(null, evaluatorRelease));
+  }
+
+  public int getMemory() {
+    return this.memory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java
new file mode 100644
index 0000000..f70e3fe
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The Driver's view of MesosExecutors running in the cluster.
+ */
+final class REEFExecutors {
+  private final Map<String, REEFExecutor> executors = new ConcurrentHashMap<>();
+
+  @Inject
+  REEFExecutors() {
+  }
+
+  public void add(final String id,
+                  final int memory,
+                  final EventHandler<EvaluatorControl> evaluatorControlHandler) {
+    executors.put(id, new REEFExecutor(memory, evaluatorControlHandler));
+  }
+
+  public void remove(final String id) {
+    this.executors.remove(id);
+  }
+
+  public Set<String> getExecutorIds() { return executors.keySet(); }
+
+  public int getMemory(final String id) {
+    return executors.get(id).getMemory();
+  }
+
+  public void launchEvaluator(final EvaluatorLaunch evaluatorLaunch) {
+    executors.get(evaluatorLaunch.getIdentifier().toString()).launchEvaluator(evaluatorLaunch);
+  }
+
+  public void releaseEvaluator(final EvaluatorRelease evaluatorRelease) {
+    executors.get(evaluatorRelease.getIdentifier().toString()).releaseEvaluator(evaluatorRelease);
+  }
+}
\ No newline at end of file