You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/04/10 01:46:43 UTC

svn commit: r1466292 - /incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java

Author: bmahler
Date: Tue Apr  9 23:46:43 2013
New Revision: 1466292

URL: http://svn.apache.org/r1466292
Log:
Fixed a deadlock in the Hadoop MesosScheduler by unsynchronizing resourceOffers().

Review: https://reviews.apache.org/r/10352

Modified:
    incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java

Modified: incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java?rev=1466292&r1=1466291&r2=1466292&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java (original)
+++ incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java Tue Apr  9 23:46:43 2013
@@ -145,9 +145,9 @@ public class MesosScheduler extends Task
     }
   };
 
-  public MesosScheduler() {
-  }
+  public MesosScheduler() {}
 
+  // TaskScheduler methods.
   @Override
   public synchronized void start() throws IOException {
     conf = getConf();
@@ -209,7 +209,6 @@ public class MesosScheduler extends Task
     taskScheduler.terminate();
   }
 
-  // TaskScheduler methods.
   @Override
   public synchronized List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
@@ -246,6 +245,16 @@ public class MesosScheduler extends Task
   }
 
   // Mesos Scheduler methods.
+  // These are synchronized, where possible. Some of these methods need to access the
+  // JobTracker, which can lead to a deadlock:
+  // See: https://issues.apache.org/jira/browse/MESOS-429
+  // The workaround employed here is to unsynchronize those methods needing access to
+  // the JobTracker state and use explicit synchronization instead as appropriate.
+  // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to
+  // split up the Scheduler and TaskScheduler implementations in order to break the
+  // locking cycle. This would require a synchronized class to store the shared
+  // state across our Scheduler and TaskScheduler implementations, and provide
+  // atomic operations as needed.
   @Override
   public synchronized void registered(SchedulerDriver schedulerDriver,
       FrameworkID frameworkID, MasterInfo masterInfo) {
@@ -259,297 +268,309 @@ public class MesosScheduler extends Task
     LOG.info("Re-registered with master " + masterInfo);
   }
 
+  // This method uses explicit synchronization in order to avoid deadlocks when
+  // accessing the JobTracker.
   @Override
-  public synchronized void resourceOffers(SchedulerDriver schedulerDriver,
+  public void resourceOffers(SchedulerDriver schedulerDriver,
       List<Offer> offers) {
-    // Compute the number of pending maps and reduces.
-    int pendingMaps = 0;
-    int pendingReduces = 0;
+    // Before synchronizing, we pull all needed information from the JobTracker.
+    final HttpHost jobTrackerAddress = new HttpHost(jobTracker.getHostname(),
+        jobTracker.getTrackerPort());
+
+    final Collection<TaskTrackerStatus> taskTrackers = jobTracker.taskTrackers();
+
+    final List<JobInProgress> jobsInProgress = new ArrayList<JobInProgress>();
     for (JobStatus status : jobTracker.jobsToComplete()) {
-      JobInProgress progress = jobTracker.getJob(status.getJobID());
-      pendingMaps += progress.pendingMaps();
-      pendingReduces += progress.pendingReduces();
+      jobsInProgress.add(jobTracker.getJob(status.getJobID()));
     }
 
-    // Mark active (heartbeated) TaskTrackers and compute idle slots.
-    int idleMapSlots = 0;
-    int idleReduceSlots = 0;
-    for (TaskTrackerStatus status : jobTracker.taskTrackers()) {
-      HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
-      if (mesosTrackers.containsKey(host)) {
-        mesosTrackers.get(host).active = true;
-        idleMapSlots += status.getAvailableMapSlots();
-        idleReduceSlots += status.getAvailableReduceSlots();
+    synchronized (this) {
+      // Compute the number of pending maps and reduces.
+      int pendingMaps = 0;
+      int pendingReduces = 0;
+      for (JobInProgress progress : jobsInProgress) {
+        pendingMaps += progress.pendingMaps();
+        pendingReduces += progress.pendingReduces();
       }
-    }
 
-    // Consider the TaskTrackers that have yet to become active as being idle,
-    // otherwise we will launch excessive TaskTrackers.
-    int inactiveMapSlots = 0;
-    int inactiveReduceSlots = 0;
-    for (MesosTracker tracker : mesosTrackers.values()) {
-      if (!tracker.active) {
-        inactiveMapSlots += tracker.mapSlots;
-        inactiveReduceSlots += tracker.reduceSlots;
+      // Mark active (heartbeated) TaskTrackers and compute idle slots.
+      int idleMapSlots = 0;
+      int idleReduceSlots = 0;
+      for (TaskTrackerStatus status : taskTrackers) {
+        HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
+        if (mesosTrackers.containsKey(host)) {
+          mesosTrackers.get(host).active = true;
+          idleMapSlots += status.getAvailableMapSlots();
+          idleReduceSlots += status.getAvailableReduceSlots();
+        }
       }
-    }
 
-    // Compute how many slots we need to allocate.
-    int neededMapSlots = Math.max(0, pendingMaps - idleMapSlots);
-    int neededReduceSlots = Math.max(0, pendingReduces - idleReduceSlots);
-
-    LOG.info(join("\n", Arrays.asList(
-        "JobTracker Status",
-        "      Pending Map Tasks: " + pendingMaps,
-        "   Pending Reduce Tasks: " + pendingReduces,
-        "         Idle Map Slots: " + idleMapSlots,
-        "      Idle Reduce Slots: " + idleReduceSlots,
-        "     Inactive Map Slots: " + inactiveMapSlots
-                                    + " (launched but no hearbeat yet)",
-        "  Inactive Reduce Slots: " + inactiveReduceSlots
-                                    + " (launched but no hearbeat yet)",
-        "       Needed Map Slots: " + neededMapSlots,
-        "    Needed Reduce Slots: " + neededReduceSlots)));
-
-    // Launch TaskTrackers to satisfy the slot requirements.
-    // TODO(bmahler): Consider slotting intelligently.
-    // Ex: If more map slots are needed, but no reduce slots are needed,
-    // launch a map-only TaskTracker to better satisfy the slot needs.
-    for (Offer offer : offers) {
-      if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
-        driver.declineOffer(offer.getId());
-        continue;
+      // Consider the TaskTrackers that have yet to become active as being idle,
+      // otherwise we will launch excessive TaskTrackers.
+      int inactiveMapSlots = 0;
+      int inactiveReduceSlots = 0;
+      for (MesosTracker tracker : mesosTrackers.values()) {
+        if (!tracker.active) {
+          inactiveMapSlots += tracker.mapSlots;
+          inactiveReduceSlots += tracker.reduceSlots;
+        }
       }
 
-      double cpus = -1.0;
-      double mem = -1.0;
-      double disk = -1.0;
-      Set<Integer> ports = new HashSet<Integer>(2);
-
-      // Pull out the cpus, memory, disk, and 2 ports from the offer.
-      for (Resource resource : offer.getResourcesList()) {
-        if (resource.getName().equals("cpus")
-            && resource.getType() == Value.Type.SCALAR) {
-          cpus = resource.getScalar().getValue();
-        } else if (resource.getName().equals("mem")
-            && resource.getType() == Value.Type.SCALAR) {
-          mem = resource.getScalar().getValue();
-        } else if (resource.getName().equals("disk")
-            && resource.getType() == Value.Type.SCALAR) {
-          disk = resource.getScalar().getValue();
-        } else if (resource.getName().equals("ports")
-            && resource.getType() == Value.Type.RANGES) {
-          for (Value.Range range : resource.getRanges().getRangeList()) {
-            if (ports.size() < 2)
-              ports.add((int) range.getBegin());
-            if (ports.size() < 2)
-              ports.add((int) range.getEnd());
+      // Compute how many slots we need to allocate.
+      int neededMapSlots = Math.max(0, pendingMaps - idleMapSlots);
+      int neededReduceSlots = Math.max(0, pendingReduces - idleReduceSlots);
+
+      LOG.info(join("\n", Arrays.asList(
+          "JobTracker Status",
+          "      Pending Map Tasks: " + pendingMaps,
+          "   Pending Reduce Tasks: " + pendingReduces,
+          "         Idle Map Slots: " + idleMapSlots,
+          "      Idle Reduce Slots: " + idleReduceSlots,
+          "     Inactive Map Slots: " + inactiveMapSlots
+                                      + " (launched but no hearbeat yet)",
+          "  Inactive Reduce Slots: " + inactiveReduceSlots
+                                      + " (launched but no hearbeat yet)",
+          "       Needed Map Slots: " + neededMapSlots,
+          "    Needed Reduce Slots: " + neededReduceSlots)));
+
+      // Launch TaskTrackers to satisfy the slot requirements.
+      // TODO(bmahler): Consider slotting intelligently.
+      // Ex: If more map slots are needed, but no reduce slots are needed,
+      // launch a map-only TaskTracker to better satisfy the slot needs.
+      for (Offer offer : offers) {
+        if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+          driver.declineOffer(offer.getId());
+          continue;
+        }
+
+        double cpus = -1.0;
+        double mem = -1.0;
+        double disk = -1.0;
+        Set<Integer> ports = new HashSet<Integer>(2);
+
+        // Pull out the cpus, memory, disk, and 2 ports from the offer.
+        for (Resource resource : offer.getResourcesList()) {
+          if (resource.getName().equals("cpus")
+              && resource.getType() == Value.Type.SCALAR) {
+            cpus = resource.getScalar().getValue();
+          } else if (resource.getName().equals("mem")
+              && resource.getType() == Value.Type.SCALAR) {
+            mem = resource.getScalar().getValue();
+          } else if (resource.getName().equals("disk")
+              && resource.getType() == Value.Type.SCALAR) {
+            disk = resource.getScalar().getValue();
+          } else if (resource.getName().equals("ports")
+              && resource.getType() == Value.Type.RANGES) {
+            for (Value.Range range : resource.getRanges().getRangeList()) {
+              if (ports.size() < 2)
+                ports.add((int) range.getBegin());
+              if (ports.size() < 2)
+                ports.add((int) range.getEnd());
+            }
           }
         }
-      }
 
-      int mapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum",
-          MAP_SLOTS_DEFAULT);
-      int reduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
-          REDUCE_SLOTS_DEFAULT);
-
-      double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
-          (float) SLOT_CPUS_DEFAULT);
-      double slotDisk = conf.getInt("mapred.mesos.slot.disk",
-          SLOT_DISK_DEFAULT);
-      double slotMem = conf.getInt("mapred.mesos.slot.mem",
-          SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
-      double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
-
-      // Total resource requirements for the container (TaskTracker + map/red
-      // tasks).
-      double containerCpus = (mapSlots + reduceSlots) * slotCpus
-          + TASKTRACKER_CPUS;
-      double containerMem = (mapSlots + reduceSlots) * slotMem
-          + TASKTRACKER_MEM;
-      double containerDisk = (mapSlots + reduceSlots) * slotDisk;
-
-      if (containerCpus > cpus || containerMem > mem || containerDisk > disk
-          || ports.size() < 2) {
-        LOG.info(join("\n", Arrays.asList(
-            "Declining offer with insufficient resources for a TaskTracker: ",
-            "  cpus: offered " + cpus + " needed " + containerCpus,
-            "  mem : offered " + mem + " needed " + containerMem,
-            "  disk: offered " + disk + " needed " + containerDisk,
-            "  ports: " + (ports.size() < 2
-                          ? " less than 2 offered"
-                          : " at least 2 (sufficient)"),
-            offer.getResourcesList().toString())));
+        int mapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum",
+            MAP_SLOTS_DEFAULT);
+        int reduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
+            REDUCE_SLOTS_DEFAULT);
+
+        double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
+            (float) SLOT_CPUS_DEFAULT);
+        double slotDisk = conf.getInt("mapred.mesos.slot.disk",
+            SLOT_DISK_DEFAULT);
+        double slotMem = conf.getInt("mapred.mesos.slot.mem",
+            SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
+        double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
+
+        // Total resource requirements for the container (TaskTracker + map/red
+        // tasks).
+        double containerCpus = (mapSlots + reduceSlots) * slotCpus
+            + TASKTRACKER_CPUS;
+        double containerMem = (mapSlots + reduceSlots) * slotMem
+            + TASKTRACKER_MEM;
+        double containerDisk = (mapSlots + reduceSlots) * slotDisk;
+
+        if (containerCpus > cpus || containerMem > mem || containerDisk > disk
+            || ports.size() < 2) {
+          LOG.info(join("\n", Arrays.asList(
+              "Declining offer with insufficient resources for a TaskTracker: ",
+              "  cpus: offered " + cpus + " needed " + containerCpus,
+              "  mem : offered " + mem + " needed " + containerMem,
+              "  disk: offered " + disk + " needed " + containerDisk,
+              "  ports: " + (ports.size() < 2
+                            ? " less than 2 offered"
+                            : " at least 2 (sufficient)"),
+              offer.getResourcesList().toString())));
 
-        driver.declineOffer(offer.getId());
-        continue;
-      }
-
-      Integer[] portArray = ports.toArray(new Integer[2]);
-      HttpHost httpAddress = new HttpHost(offer.getHostname(), portArray[0]);
-      HttpHost reportAddress = new HttpHost(offer.getHostname(), portArray[1]);
-      HttpHost jobTrackerAddress = new HttpHost(jobTracker.getHostname(),
-          jobTracker.getTrackerPort());
-
-      TaskID taskId = TaskID.newBuilder()
-          .setValue("Task_Tracker_" + launchedTrackers++).build();
-
-      LOG.info("Launching task " + taskId.getValue() + " on "
-          + httpAddress.toString());
-
-      // Add this tracker to Mesos tasks.
-      mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
-          mapSlots, reduceSlots));
-
-      // Create the environment depending on whether the executor is going to be
-      // run locally.
-      // TODO(vinod): Do not pass the mapred config options as environment
-      // variables.
-      Protos.Environment.Builder envBuilder = Protos.Environment
-          .newBuilder()
-          .addVariables(
-              Protos.Environment.Variable
-                  .newBuilder()
-                  .setName("mapred.job.tracker")
-                  .setValue(jobTrackerAddress.getHostName() + ':'
-                      + jobTrackerAddress.getPort()))
-          .addVariables(
-              Protos.Environment.Variable
-                  .newBuilder()
-                  .setName("mapred.task.tracker.http.address")
-                  .setValue(
-                      httpAddress.getHostName() + ':' + httpAddress.getPort()))
-          .addVariables(
-              Protos.Environment.Variable
-                  .newBuilder()
-                  .setName("mapred.task.tracker.report.address")
-                  .setValue(reportAddress.getHostName() + ':'
-                      + reportAddress.getPort()))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-                  .setName("mapred.map.child.java.opts")
-                  .setValue("-Xmx" + slotJVMHeap + "m"))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-                  .setName("mapred.reduce.child.java.opts")
-                  .setValue("-Xmx" + slotJVMHeap + "m"))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-                  .setName("HADOOP_HEAPSIZE")
-                  .setValue("" + TASKTRACKER_JVM_HEAP));
-
-      // Set java specific environment, appropriately.
-      Map<String, String> env = System.getenv();
-      if (env.containsKey("JAVA_HOME")) {
-        envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
-            .setName("JAVA_HOME")
-            .setValue(env.get("JAVA_HOME")));
-      }
+          driver.declineOffer(offer.getId());
+          continue;
+        }
 
-      if (env.containsKey("JAVA_LIBRARY_PATH")) {
-        envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
-            .setName("JAVA_LIBRARY_PATH")
-            .setValue(env.get("JAVA_LIBRARY_PATH")));
-      }
+        Integer[] portArray = ports.toArray(new Integer[2]);
+        HttpHost httpAddress = new HttpHost(offer.getHostname(), portArray[0]);
+        HttpHost reportAddress = new HttpHost(offer.getHostname(), portArray[1]);
+
+        TaskID taskId = TaskID.newBuilder()
+            .setValue("Task_Tracker_" + launchedTrackers++).build();
+
+        LOG.info("Launching task " + taskId.getValue() + " on "
+            + httpAddress.toString());
+
+        // Add this tracker to Mesos tasks.
+        mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
+            mapSlots, reduceSlots));
+
+        // Create the environment depending on whether the executor is going to be
+        // run locally.
+        // TODO(vinod): Do not pass the mapred config options as environment
+        // variables.
+        Protos.Environment.Builder envBuilder = Protos.Environment
+            .newBuilder()
+            .addVariables(
+                Protos.Environment.Variable
+                    .newBuilder()
+                    .setName("mapred.job.tracker")
+                    .setValue(jobTrackerAddress.getHostName() + ':'
+                        + jobTrackerAddress.getPort()))
+            .addVariables(
+                Protos.Environment.Variable
+                    .newBuilder()
+                    .setName("mapred.task.tracker.http.address")
+                    .setValue(
+                        httpAddress.getHostName() + ':' + httpAddress.getPort()))
+            .addVariables(
+                Protos.Environment.Variable
+                    .newBuilder()
+                    .setName("mapred.task.tracker.report.address")
+                    .setValue(reportAddress.getHostName() + ':'
+                        + reportAddress.getPort()))
+            .addVariables(
+                Protos.Environment.Variable.newBuilder()
+                    .setName("mapred.map.child.java.opts")
+                    .setValue("-Xmx" + slotJVMHeap + "m"))
+            .addVariables(
+                Protos.Environment.Variable.newBuilder()
+                    .setName("mapred.reduce.child.java.opts")
+                    .setValue("-Xmx" + slotJVMHeap + "m"))
+            .addVariables(
+                Protos.Environment.Variable.newBuilder()
+                    .setName("HADOOP_HEAPSIZE")
+                    .setValue("" + TASKTRACKER_JVM_HEAP));
+
+        // Set java specific environment, appropriately.
+        Map<String, String> env = System.getenv();
+        if (env.containsKey("JAVA_HOME")) {
+          envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+              .setName("JAVA_HOME")
+              .setValue(env.get("JAVA_HOME")));
+        }
 
-      // Command info differs when performing a local run.
-      CommandInfo commandInfo = null;
-      String master = conf.get("mapred.mesos.master", "local");
+        if (env.containsKey("JAVA_LIBRARY_PATH")) {
+          envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+              .setName("JAVA_LIBRARY_PATH")
+              .setValue(env.get("JAVA_LIBRARY_PATH")));
+        }
 
-      if (master.equals("local")) {
-        try {
+        // Command info differs when performing a local run.
+        CommandInfo commandInfo = null;
+        String master = conf.get("mapred.mesos.master", "local");
+
+        if (master.equals("local")) {
+          try {
+            commandInfo = CommandInfo.newBuilder()
+                .setEnvironment(envBuilder)
+                .setValue(new File("bin/mesos-executor").getCanonicalPath())
+                .build();
+          } catch (IOException e) {
+            LOG.fatal("Failed to find Mesos executor ", e);
+            System.exit(1);
+          }
+        } else {
+          String uri = conf.get("mapred.mesos.executor");
           commandInfo = CommandInfo.newBuilder()
               .setEnvironment(envBuilder)
-              .setValue(new File("bin/mesos-executor").getCanonicalPath())
-              .build();
-        } catch (IOException e) {
-          LOG.fatal("Failed to find Mesos executor ", e);
-          System.exit(1);
+              .setValue("cd hadoop && ./bin/mesos-executor")
+              .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
         }
-      } else {
-        String uri = conf.get("mapred.mesos.executor");
-        commandInfo = CommandInfo.newBuilder()
-            .setEnvironment(envBuilder)
-            .setValue("cd hadoop && ./bin/mesos-executor")
-            .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
-      }
 
-      TaskInfo info = TaskInfo
-          .newBuilder()
-          .setName(taskId.getValue())
-          .setTaskId(taskId)
-          .setSlaveId(offer.getSlaveId())
-          .addResources(
-              Resource
-                  .newBuilder()
-                  .setName("cpus")
-                  .setType(Value.Type.SCALAR)
-                  .setScalar(Value.Scalar.newBuilder().setValue(
-                      (mapSlots + reduceSlots) * slotCpus)))
-          .addResources(
-              Resource
-                  .newBuilder()
-                  .setName("mem")
-                  .setType(Value.Type.SCALAR)
-                  .setScalar(Value.Scalar.newBuilder().setValue(
-                      (mapSlots + reduceSlots) * slotMem)))
-          .addResources(
-              Resource
-                  .newBuilder()
-                  .setName("disk")
-                  .setType(Value.Type.SCALAR)
-                  .setScalar(Value.Scalar.newBuilder().setValue(
-                      (mapSlots + reduceSlots) * slotDisk)))
-          .addResources(
-              Resource
-                  .newBuilder()
-                  .setName("ports")
-                  .setType(Value.Type.RANGES)
-                  .setRanges(
-                      Value.Ranges
-                          .newBuilder()
-                          .addRange(Value.Range.newBuilder()
-                                        .setBegin(httpAddress.getPort())
-                                        .setEnd(httpAddress.getPort()))
-                          .addRange(Value.Range.newBuilder()
-                                        .setBegin(reportAddress.getPort())
-                                        .setEnd(reportAddress.getPort()))))
-          .setExecutor(
-              ExecutorInfo
-                  .newBuilder()
-                  .setExecutorId(ExecutorID.newBuilder().setValue(
-                      "executor_" + taskId.getValue()))
-                  .setName("Hadoop TaskTracker")
-                  .setSource(taskId.getValue())
-                  .addResources(
-                      Resource
-                          .newBuilder()
-                          .setName("cpus")
-                          .setType(Value.Type.SCALAR)
-                          .setScalar(Value.Scalar.newBuilder().setValue(
-                              (TASKTRACKER_CPUS))))
-                  .addResources(
-                      Resource
-                          .newBuilder()
-                          .setName("mem")
-                          .setType(Value.Type.SCALAR)
-                          .setScalar(Value.Scalar.newBuilder().setValue(
-                              (TASKTRACKER_MEM)))).setCommand(commandInfo))
-          .build();
+        TaskInfo info = TaskInfo
+            .newBuilder()
+            .setName(taskId.getValue())
+            .setTaskId(taskId)
+            .setSlaveId(offer.getSlaveId())
+            .addResources(
+                Resource
+                    .newBuilder()
+                    .setName("cpus")
+                    .setType(Value.Type.SCALAR)
+                    .setScalar(Value.Scalar.newBuilder().setValue(
+                        (mapSlots + reduceSlots) * slotCpus)))
+            .addResources(
+                Resource
+                    .newBuilder()
+                    .setName("mem")
+                    .setType(Value.Type.SCALAR)
+                    .setScalar(Value.Scalar.newBuilder().setValue(
+                        (mapSlots + reduceSlots) * slotMem)))
+            .addResources(
+                Resource
+                    .newBuilder()
+                    .setName("disk")
+                    .setType(Value.Type.SCALAR)
+                    .setScalar(Value.Scalar.newBuilder().setValue(
+                        (mapSlots + reduceSlots) * slotDisk)))
+            .addResources(
+                Resource
+                    .newBuilder()
+                    .setName("ports")
+                    .setType(Value.Type.RANGES)
+                    .setRanges(
+                        Value.Ranges
+                            .newBuilder()
+                            .addRange(Value.Range.newBuilder()
+                                .setBegin(httpAddress.getPort())
+                                .setEnd(httpAddress.getPort()))
+                            .addRange(Value.Range.newBuilder()
+                                .setBegin(reportAddress.getPort())
+                                .setEnd(reportAddress.getPort()))))
+            .setExecutor(
+                ExecutorInfo
+                    .newBuilder()
+                    .setExecutorId(ExecutorID.newBuilder().setValue(
+                        "executor_" + taskId.getValue()))
+                    .setName("Hadoop TaskTracker")
+                    .setSource(taskId.getValue())
+                    .addResources(
+                        Resource
+                            .newBuilder()
+                            .setName("cpus")
+                            .setType(Value.Type.SCALAR)
+                            .setScalar(Value.Scalar.newBuilder().setValue(
+                                (TASKTRACKER_CPUS))))
+                    .addResources(
+                        Resource
+                            .newBuilder()
+                            .setName("mem")
+                            .setType(Value.Type.SCALAR)
+                            .setScalar(Value.Scalar.newBuilder().setValue(
+                                (TASKTRACKER_MEM)))).setCommand(commandInfo))
+            .build();
 
-      driver.launchTasks(offer.getId(), Arrays.asList(info));
+        driver.launchTasks(offer.getId(), Arrays.asList(info));
 
-      neededMapSlots -= mapSlots;
-      neededReduceSlots -= reduceSlots;
-    }
+        neededMapSlots -= mapSlots;
+        neededReduceSlots -= reduceSlots;
+      }
 
-    if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
-      LOG.info("Satisfied map and reduce slots needed.");
-    } else {
-      LOG.info("Unable to fully satisfy needed map/reduce slots: "
-          + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
-          + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
-          + "remaining");
+      if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+        LOG.info("Satisfied map and reduce slots needed.");
+      } else {
+        LOG.info("Unable to fully satisfy needed map/reduce slots: "
+            + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
+            + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
+            + "remaining");
+      }
     }
   }
 
@@ -568,21 +589,29 @@ public class MesosScheduler extends Task
 
     // Remove the TaskTracker if the corresponding Mesos task has reached a
     // terminal state.
-    TaskState state = taskStatus.getState();
-
-    if (state == TaskState.TASK_FINISHED || state == TaskState.TASK_FAILED
-        || state == TaskState.TASK_KILLED || state == TaskState.TASK_LOST) {
-
-      // Make a copy to iterate over keys and delete values.
-      Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
-
-      // Remove the task from the map.
-      for (HttpHost tracker : trackers) {
-        if (mesosTrackers.get(tracker).taskId == taskStatus.getTaskId()) {
-          LOG.info("Removing terminated TaskTracker: " + tracker);
-          mesosTrackers.remove(tracker);
+    switch (taskStatus.getState()) {
+      case TASK_FINISHED:
+      case TASK_FAILED:
+      case TASK_KILLED:
+      case TASK_LOST:
+        // Make a copy to iterate over keys and delete values.
+        Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
+
+        // Remove the task from the map.
+        for (HttpHost tracker : trackers) {
+          if (mesosTrackers.get(tracker).taskId == taskStatus.getTaskId()) {
+            LOG.info("Removing terminated TaskTracker: " + tracker);
+            mesosTrackers.remove(tracker);
+          }
         }
-      }
+        break;
+      case TASK_STAGING:
+      case TASK_STARTING:
+      case TASK_RUNNING:
+        break;
+      default:
+        LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
+        break;
     }
   }