You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by br...@apache.org on 2013/07/30 21:48:49 UTC

[1/4] git commit: Be more intelligent about slot allocation.

Updated Branches:
  refs/heads/master aa2c15d10 -> 0605e9044


Be more intelligent about slot allocation.

When we get a resource offer, we'll try to be clever about how we
allocate slots to the offers.  Ideally we want to use as much of the
resources available when we have a lot of pending tasks, and we also
don't want to use more resources than we actually need.

Review: https://reviews.apache.org/r/13078


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/97643616
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/97643616
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/97643616

Branch: refs/heads/master
Commit: 97643616ad222882cf43a9f1962de77390503d0a
Parents: aa2c15d
Author: Brenden Matthews <br...@airbnb.com>
Authored: Mon May 13 16:08:35 2013 -0700
Committer: Brenden Matthews <br...@airbnb.com>
Committed: Tue Jul 30 12:46:33 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/MesosScheduler.java    | 167 ++++++++++++-------
 1 file changed, 108 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/97643616/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
index 0c10874..11f972a 100644
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -359,8 +359,8 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
       }
 
       // Compute how many slots we need to allocate.
-      int neededMapSlots = Math.max(0, pendingMaps - idleMapSlots);
-      int neededReduceSlots = Math.max(0, pendingReduces - idleReduceSlots);
+      int neededMapSlots = Math.max(0, pendingMaps - (idleMapSlots + inactiveMapSlots));
+      int neededReduceSlots = Math.max(0, pendingReduces - (idleReduceSlots + inactiveReduceSlots));
 
       LOG.info(join("\n", Arrays.asList(
               "JobTracker Status",
@@ -413,10 +413,16 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
           }
         }
 
-        int mapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum",
+        int mapSlotsMax = conf.getInt("mapred.tasktracker.map.tasks.maximum",
             MAP_SLOTS_DEFAULT);
-        int reduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
-            REDUCE_SLOTS_DEFAULT);
+        int reduceSlotsMax =
+          conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
+              REDUCE_SLOTS_DEFAULT);
+
+        // What's the minimum number of map and reduce slots we should try to
+        // launch?
+        long mapSlots = 0;
+        long reduceSlots = 0;
 
         double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
             (float) SLOT_CPUS_DEFAULT);
@@ -426,39 +432,74 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
             SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
         double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
 
-        // Total resource requirements for the container (TaskTracker + map/red
+        // Minimum resource requirements for the container (TaskTracker + map/red
         // tasks).
-        double containerCpus = (mapSlots + reduceSlots) * slotCpus
-            + TASKTRACKER_CPUS;
-        double containerMem = (mapSlots + reduceSlots) * slotMem
-            + TASKTRACKER_MEM;
-        double containerDisk = (mapSlots + reduceSlots) * slotDisk;
-
-        if (containerCpus > cpus || containerMem > mem || containerDisk > disk
-            || ports.size() < 2) {
+        double containerCpus = TASKTRACKER_CPUS;
+        double containerMem = TASKTRACKER_MEM;
+        double containerDisk = 0;
+
+        // Determine how many slots we can allocate.
+        int slots = mapSlotsMax + reduceSlotsMax;
+        slots = (int)Math.min(slots, (cpus - containerCpus) / slotCpus);
+        slots = (int)Math.min(slots, (mem - containerMem) / slotMem);
+        slots = (int)Math.min(slots, (disk - containerDisk) / slotDisk);
+
+        // Is this offer too small for even the minimum slots?
+        if (slots < 1 || ports.size() < 2) {
           LOG.info(join("\n", Arrays.asList(
-              "Declining offer with insufficient resources for a TaskTracker: ",
-              "  cpus: offered " + cpus + " needed " + containerCpus,
-              "  mem : offered " + mem + " needed " + containerMem,
-              "  disk: offered " + disk + " needed " + containerDisk,
-              "  ports: " + (ports.size() < 2
-                            ? " less than 2 offered"
-                            : " at least 2 (sufficient)"),
-              offer.getResourcesList().toString())));
+                  "Declining offer with insufficient resources for a TaskTracker: ",
+                  "  cpus: offered " + cpus + " needed " + containerCpus,
+                  "  mem : offered " + mem + " needed " + containerMem,
+                  "  disk: offered " + disk + " needed " + containerDisk,
+                  "  ports: " + (ports.size() < 2
+                    ? " less than 2 offered"
+                    : " at least 2 (sufficient)"),
+                  offer.getResourcesList().toString())));
 
           driver.declineOffer(offer.getId());
           continue;
         }
 
+        // Is the number of slots we need sufficiently small? If so, we can
+        // allocate exactly the number we need.
+        if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots <
+            mapSlotsMax && neededReduceSlots < reduceSlotsMax) {
+          mapSlots = neededMapSlots;
+          reduceSlots = neededReduceSlots;
+        } else {
+          // Allocate slots fairly for this resource offer.
+          double mapFactor = (double)neededMapSlots / (neededMapSlots + neededReduceSlots);
+          double reduceFactor = (double)neededReduceSlots / (neededMapSlots + neededReduceSlots);
+          // To avoid map/reduce slot starvation, don't allow more than 50%
+          // spread between map/reduce slots when we need both mappers and
+          // reducers.
+          if (neededMapSlots > 0 && neededReduceSlots > 0) {
+            if (mapFactor < 0.25) {
+              mapFactor = 0.25;
+            } else if (mapFactor > 0.75) {
+              mapFactor = 0.75;
+            }
+            if (reduceFactor < 0.25) {
+              reduceFactor = 0.25;
+            } else if (reduceFactor > 0.75) {
+              reduceFactor = 0.75;
+            }
+          }
+          mapSlots = Math.min(Math.min((long)(mapFactor * slots), mapSlotsMax), neededMapSlots);
+          // The remaining slots are allocated for reduces.
+          slots -= mapSlots;
+          reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots);
+        }
+
         Integer[] portArray = ports.toArray(new Integer[2]);
         HttpHost httpAddress = new HttpHost(offer.getHostname(), portArray[0]);
         HttpHost reportAddress = new HttpHost(offer.getHostname(), portArray[1]);
 
         TaskID taskId = TaskID.newBuilder()
-            .setValue("Task_Tracker_" + launchedTrackers++).build();
+          .setValue("Task_Tracker_" + launchedTrackers++).build();
 
         LOG.info("Launching task " + taskId.getValue() + " on "
-            + httpAddress.toString());
+            + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots);
 
         // Add this tracker to Mesos tasks.
         mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
@@ -469,37 +510,45 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
         // TODO(vinod): Do not pass the mapred config options as environment
         // variables.
         Protos.Environment.Builder envBuilder = Protos.Environment
-            .newBuilder()
-            .addVariables(
-                Protos.Environment.Variable
-                    .newBuilder()
-                    .setName("mapred.job.tracker")
-                    .setValue(jobTrackerAddress.getHostName() + ':'
-                        + jobTrackerAddress.getPort()))
-            .addVariables(
-                Protos.Environment.Variable
-                    .newBuilder()
-                    .setName("mapred.task.tracker.http.address")
-                    .setValue(
-                        httpAddress.getHostName() + ':' + httpAddress.getPort()))
-            .addVariables(
-                Protos.Environment.Variable
-                    .newBuilder()
-                    .setName("mapred.task.tracker.report.address")
-                    .setValue(reportAddress.getHostName() + ':'
-                        + reportAddress.getPort()))
-            .addVariables(
-                Protos.Environment.Variable.newBuilder()
-                    .setName("mapred.map.child.java.opts")
-                    .setValue("-Xmx" + slotJVMHeap + "m"))
-            .addVariables(
-                Protos.Environment.Variable.newBuilder()
-                    .setName("mapred.reduce.child.java.opts")
-                    .setValue("-Xmx" + slotJVMHeap + "m"))
-            .addVariables(
-                Protos.Environment.Variable.newBuilder()
-                    .setName("HADOOP_HEAPSIZE")
-                    .setValue("" + TASKTRACKER_JVM_HEAP));
+          .newBuilder()
+          .addVariables(
+              Protos.Environment.Variable
+              .newBuilder()
+              .setName("mapred.job.tracker")
+              .setValue(jobTrackerAddress.getHostName() + ':'
+                + jobTrackerAddress.getPort()))
+          .addVariables(
+              Protos.Environment.Variable
+              .newBuilder()
+              .setName("mapred.task.tracker.http.address")
+              .setValue(
+                httpAddress.getHostName() + ':' + httpAddress.getPort()))
+          .addVariables(
+              Protos.Environment.Variable
+              .newBuilder()
+              .setName("mapred.task.tracker.report.address")
+              .setValue(reportAddress.getHostName() + ':'
+                + reportAddress.getPort()))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("mapred.map.child.java.opts")
+              .setValue("-Xmx" + slotJVMHeap + "m"))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("mapred.reduce.child.java.opts")
+              .setValue("-Xmx" + slotJVMHeap + "m"))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("HADOOP_HEAPSIZE")
+              .setValue("" + TASKTRACKER_JVM_HEAP))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("mapred.tasktracker.map.tasks.maximum")
+              .setValue("" + mapSlots))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("mapred.tasktracker.reduce.tasks.maximum")
+              .setValue("" + reduceSlots));
 
         // Set java specific environment, appropriately.
         Map<String, String> env = System.getenv();
@@ -695,15 +744,15 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
   private class MesosTracker {
     public HttpHost host;
     public TaskID taskId;
-    public int mapSlots;
-    public int reduceSlots;
+    public long mapSlots;
+    public long reduceSlots;
     public boolean active = false; // Set once tracked by the JobTracker.
 
     // Tracks Hadoop tasks running on the tracker.
     public Set<TaskAttemptID> hadoopTasks = new HashSet<TaskAttemptID>();
 
-    public MesosTracker(HttpHost host, TaskID taskId, int mapSlots,
-        int reduceSlots) {
+    public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
+        long reduceSlots) {
       this.host = host;
       this.taskId = taskId;
       this.mapSlots = mapSlots;


[4/4] git commit: Allowed some JVM memory vars to be configured.

Posted by br...@apache.org.
Allowed some JVM memory vars to be configured.

Also modified the slot CPU/mem values to be more sane defaults.

Review: https://reviews.apache.org/r/11130


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0605e904
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0605e904
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0605e904

Branch: refs/heads/master
Commit: 0605e90445beb34f621b369063d9bc452ff8cd39
Parents: aac37e8
Author: Brenden Matthews <br...@airbnb.com>
Authored: Fri May 10 14:34:22 2013 -0700
Committer: Brenden Matthews <br...@airbnb.com>
Committed: Tue Jul 30 12:48:29 2013 -0700

----------------------------------------------------------------------
 hadoop/mapred-site.xml.patch                    |  6 ++---
 .../apache/hadoop/mapred/MesosScheduler.java    | 27 ++++++++++++--------
 2 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0605e904/hadoop/mapred-site.xml.patch
----------------------------------------------------------------------
diff --git a/hadoop/mapred-site.xml.patch b/hadoop/mapred-site.xml.patch
index 8b39979..d5a99f6 100644
--- a/hadoop/mapred-site.xml.patch
+++ b/hadoop/mapred-site.xml.patch
@@ -37,7 +37,7 @@ index 970c8fe..f9f272d 100644
 +# that are allocated to a Hadoop slot (i.e., map/reduce task) by Mesos.
 +  <property>
 +    <name>mapred.mesos.slot.cpus</name>
-+    <value>0.2</value>
++    <value>1</value>
 +  </property>
 +  <property>
 +    <name>mapred.mesos.slot.disk</name>
@@ -47,8 +47,8 @@ index 970c8fe..f9f272d 100644
 +  <property>
 +    <name>mapred.mesos.slot.mem</name>
 +    <!-- Note that this is the total memory required for
-+         JVM overhead (256 MB) and the heap (-Xmx) of the task.
++         JVM overhead (10% of this value) and the heap (-Xmx) of the task.
 +         The value is in MB. -->
-+    <value>512</value>
++    <value>1024</value>
 +  </property>
  </configuration>

http://git-wip-us.apache.org/repos/asf/mesos/blob/0605e904/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
index e4fbb80..69d4655 100644
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -50,7 +50,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
 
   // This is the memory overhead for a jvm process. This needs to be added
   // to a jvm process's resource requirement, in addition to its heap size.
-  private static final int JVM_MEM_OVERHEAD = 256; // 256 MB.
+  private static final double JVM_MEM_OVERHEAD_PERCENT_DEFAULT = 0.1; // 10%.
 
   // TODO(vinod): Consider parsing the slot memory from the configuration jvm
   // heap options (e.g: mapred.child.java.opts).
@@ -62,12 +62,10 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
   // 1 GB of disk space.
   private static final double SLOT_CPUS_DEFAULT = 0.2; // 0.2 cores.
   private static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
-  private static final int SLOT_JVM_HEAP_DEFAULT = 256; // MB.
+  private static final int SLOT_JVM_HEAP_DEFAULT = 256; // 256MB.
 
   private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
-  private static final int TASKTRACKER_JVM_HEAP = 1024; // 1 GB.
-  private static final int TASKTRACKER_MEM =
-    TASKTRACKER_JVM_HEAP + JVM_MEM_OVERHEAD;
+  private static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB.
 
   // The default behavior in Hadoop is to use 4 slots per TaskTracker:
   private static final int MAP_SLOTS_DEFAULT = 2;
@@ -445,14 +443,21 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
             (float) SLOT_CPUS_DEFAULT);
         double slotDisk = conf.getInt("mapred.mesos.slot.disk",
             SLOT_DISK_DEFAULT);
-        double slotMem = conf.getInt("mapred.mesos.slot.mem",
-            SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
-        double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
+
+        int slotMem = conf.getInt("mapred.mesos.slot.mem",
+            SLOT_JVM_HEAP_DEFAULT);
+        long slotJVMHeap = Math.round((double)slotMem -
+            (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * slotMem));
+
+        int tasktrackerMem = conf.getInt("mapred.mesos.tasktracker.mem",
+              TASKTRACKER_MEM_DEFAULT);
+        long tasktrackerJVMHeap = Math.round((double)tasktrackerMem -
+            (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * tasktrackerMem));
 
         // Minimum resource requirements for the container (TaskTracker + map/red
         // tasks).
         double containerCpus = TASKTRACKER_CPUS;
-        double containerMem = TASKTRACKER_MEM;
+        double containerMem = tasktrackerMem;
         double containerDisk = 0;
 
         // Determine how many slots we can allocate.
@@ -557,7 +562,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
           .addVariables(
               Protos.Environment.Variable.newBuilder()
               .setName("HADOOP_HEAPSIZE")
-              .setValue("" + TASKTRACKER_JVM_HEAP))
+              .setValue("" + tasktrackerJVMHeap))
           .addVariables(
               Protos.Environment.Variable.newBuilder()
               .setName("mapred.tasktracker.map.tasks.maximum")
@@ -663,7 +668,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
                 .setName("mem")
                 .setType(Value.Type.SCALAR)
                 .setScalar(Value.Scalar.newBuilder().setValue(
-                    (TASKTRACKER_MEM)))).setCommand(commandInfo))
+                    (tasktrackerMem)))).setCommand(commandInfo))
                     .build();
 
         driver.launchTasks(offer.getId(), Arrays.asList(info));


[2/4] git commit: Don't prematurely kill TaskTrackers.

Posted by br...@apache.org.
Don't prematurely kill TaskTrackers.

We assign the JobID to the internal Mesos tracker (for the TaskTracker)
rather than the TaskID.

In the case where a TaskTracker has only map tasks assigned to it and
they have all completed, we mustn't terminate the tracker until the
entire job has finished, including the map tasks.

Review: https://reviews.apache.org/r/11119


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5ef452cb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5ef452cb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5ef452cb

Branch: refs/heads/master
Commit: 5ef452cb4b1cef1df94592f7e637644140f9fda4
Parents: 9764361
Author: Brenden Matthews <br...@airbnb.com>
Authored: Mon May 13 16:13:16 2013 -0700
Committer: Brenden Matthews <br...@airbnb.com>
Committed: Tue Jul 30 12:47:37 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/MesosScheduler.java    | 180 ++++++++++---------
 1 file changed, 92 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5ef452cb/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
index 11f972a..ca6106b 100644
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -65,7 +65,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
   private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
   private static final int TASKTRACKER_JVM_HEAP = 1024; // 1 GB.
   private static final int TASKTRACKER_MEM =
-      TASKTRACKER_JVM_HEAP + JVM_MEM_OVERHEAD;
+    TASKTRACKER_JVM_HEAP + JVM_MEM_OVERHEAD;
 
   // The default behavior in Hadoop is to use 4 slots per TaskTracker:
   private static final int MAP_SLOTS_DEFAULT = 2;
@@ -78,7 +78,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
   // Used for tracking the slots of each TaskTracker and the corresponding
   // Mesos TaskID.
   private Map<HttpHost, MesosTracker> mesosTrackers =
-      new HashMap<HttpHost, MesosTracker>();
+    new HashMap<HttpHost, MesosTracker>();
 
   private JobInProgressListener jobListener = new JobInProgressListener() {
     @Override
@@ -97,8 +97,9 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
         JobInProgress job = event.getJobInProgress();
 
         // If the job is complete, kill all the corresponding idle TaskTrackers.
-        if (!job.isComplete())
+        if (!job.isComplete()) {
           return;
+        }
 
         LOG.info("Completed job : " + job.getJobID());
 
@@ -111,10 +112,12 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
         completed.addAll(job.reportTasksInProgress(false, true));
 
         for (TaskInProgress task : completed) {
-          for (TaskStatus status : task.getTaskStatuses()) {
-            LOG.info("Removing completed task : " + status.getTaskID()
-                + " of tracker " + status.getTaskTracker());
+          // Check that this task actually belongs to this job
+          if (task.getJob().getJobID() != job.getJobID()) {
+            continue;
+          }
 
+          for (TaskStatus status : task.getTaskStatuses()) {
             // Make a copy to iterate over keys and delete values.
             Set<HttpHost> trackers = new HashSet<HttpHost>(
                 mesosTrackers.keySet());
@@ -129,12 +132,15 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
                 continue;
               }
 
-              mesosTracker.hadoopTasks.remove(status.getTaskID());
+              LOG.info("Removing completed task : " + status.getTaskID()
+                  + " of tracker " + status.getTaskTracker());
 
-              // If this TaskTracker doesn't have any running tasks, kill it.
-              if (mesosTracker.hadoopTasks.isEmpty()) {
-                LOG.info("Killing Mesos task: " + mesosTracker.taskId
-                    + " on host " + mesosTracker.host);
+              mesosTracker.hadoopJobs.remove(job.getJobID());
+
+              // If the TaskTracker doesn't have any running tasks, kill it.
+              if (mesosTracker.hadoopJobs.isEmpty()) {
+                LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
+                    + mesosTracker.host);
 
                 driver.killTask(mesosTracker.taskId);
                 mesosTrackers.remove(tracker);
@@ -157,7 +163,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
 
     try {
       taskScheduler =
-          (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
+        (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
       taskScheduler.setConf(conf);
       taskScheduler.setTaskTrackerManager(taskTrackerManager);
     } catch (ClassNotFoundException e) {
@@ -181,10 +187,10 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
 
     try {
       FrameworkInfo frameworkInfo = FrameworkInfo
-          .newBuilder()
-          .setUser("")
-          .setName("Hadoop: (RPC port: " + jobTracker.port + ","
-                   + " WebUI port: " + jobTracker.infoPort + ")").build();
+        .newBuilder()
+        .setUser("")
+        .setName("Hadoop: (RPC port: " + jobTracker.port + ","
+            + " WebUI port: " + jobTracker.infoPort + ")").build();
 
       driver = new MesosSchedulerDriver(this, frameworkInfo, master);
       driver.start();
@@ -212,7 +218,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
 
   @Override
   public synchronized List<Task> assignTasks(TaskTracker taskTracker)
-      throws IOException {
+    throws IOException {
     HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
         taskTracker.getStatus().getHttpPort());
 
@@ -229,9 +235,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
     if (tasks != null) {
       // Keep track of which TaskTracker contains which tasks.
       for (Task task : tasks) {
-        LOG.info("Assigning task : " + task.getTaskID() + " to tracker "
-            + tracker);
-        mesosTrackers.get(tracker).hadoopTasks.add(task.getTaskID());
+        mesosTrackers.get(tracker).hadoopJobs.add(task.getJobID());
       }
     }
 
@@ -286,7 +290,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
     for (int i = 0; i < totalTasks; ++i) {
       TaskInProgress task = tasks[i];
       if (task == null) {
-	continue;
+        continue;
       }
       if (task.isComplete()) {
         finishedTasks += 1;
@@ -571,9 +575,9 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
         if (master.equals("local")) {
           try {
             commandInfo = CommandInfo.newBuilder()
-                .setEnvironment(envBuilder)
-                .setValue(new File("bin/mesos-executor").getCanonicalPath())
-                .build();
+              .setEnvironment(envBuilder)
+              .setValue(new File("bin/mesos-executor").getCanonicalPath())
+              .build();
           } catch (IOException e) {
             LOG.fatal("Failed to find Mesos executor ", e);
             System.exit(1);
@@ -581,73 +585,73 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
         } else {
           String uri = conf.get("mapred.mesos.executor");
           commandInfo = CommandInfo.newBuilder()
-              .setEnvironment(envBuilder)
-              .setValue("cd hadoop-* && ./bin/mesos-executor")
-              .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
+            .setEnvironment(envBuilder)
+            .setValue("cd hadoop-* && ./bin/mesos-executor")
+            .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
         }
 
         TaskInfo info = TaskInfo
-            .newBuilder()
-            .setName(taskId.getValue())
-            .setTaskId(taskId)
-            .setSlaveId(offer.getSlaveId())
-            .addResources(
-                Resource
-                    .newBuilder()
-                    .setName("cpus")
-                    .setType(Value.Type.SCALAR)
-                    .setScalar(Value.Scalar.newBuilder().setValue(
-                        (mapSlots + reduceSlots) * slotCpus)))
-            .addResources(
-                Resource
-                    .newBuilder()
-                    .setName("mem")
-                    .setType(Value.Type.SCALAR)
-                    .setScalar(Value.Scalar.newBuilder().setValue(
-                        (mapSlots + reduceSlots) * slotMem)))
-            .addResources(
+          .newBuilder()
+          .setName(taskId.getValue())
+          .setTaskId(taskId)
+          .setSlaveId(offer.getSlaveId())
+          .addResources(
+              Resource
+              .newBuilder()
+              .setName("cpus")
+              .setType(Value.Type.SCALAR)
+              .setScalar(Value.Scalar.newBuilder().setValue(
+                  (mapSlots + reduceSlots) * slotCpus)))
+          .addResources(
+              Resource
+              .newBuilder()
+              .setName("mem")
+              .setType(Value.Type.SCALAR)
+              .setScalar(Value.Scalar.newBuilder().setValue(
+                  (mapSlots + reduceSlots) * slotMem)))
+          .addResources(
+              Resource
+              .newBuilder()
+              .setName("disk")
+              .setType(Value.Type.SCALAR)
+              .setScalar(Value.Scalar.newBuilder().setValue(
+                  (mapSlots + reduceSlots) * slotDisk)))
+          .addResources(
+              Resource
+              .newBuilder()
+              .setName("ports")
+              .setType(Value.Type.RANGES)
+              .setRanges(
+                Value.Ranges
+                .newBuilder()
+                .addRange(Value.Range.newBuilder()
+                  .setBegin(httpAddress.getPort())
+                  .setEnd(httpAddress.getPort()))
+                .addRange(Value.Range.newBuilder()
+                  .setBegin(reportAddress.getPort())
+                  .setEnd(reportAddress.getPort()))))
+          .setExecutor(
+              ExecutorInfo
+              .newBuilder()
+              .setExecutorId(ExecutorID.newBuilder().setValue(
+                  "executor_" + taskId.getValue()))
+              .setName("Hadoop TaskTracker")
+              .setSource(taskId.getValue())
+              .addResources(
                 Resource
-                    .newBuilder()
-                    .setName("disk")
-                    .setType(Value.Type.SCALAR)
-                    .setScalar(Value.Scalar.newBuilder().setValue(
-                        (mapSlots + reduceSlots) * slotDisk)))
-            .addResources(
+                .newBuilder()
+                .setName("cpus")
+                .setType(Value.Type.SCALAR)
+                .setScalar(Value.Scalar.newBuilder().setValue(
+                    (TASKTRACKER_CPUS))))
+              .addResources(
                 Resource
-                    .newBuilder()
-                    .setName("ports")
-                    .setType(Value.Type.RANGES)
-                    .setRanges(
-                        Value.Ranges
-                            .newBuilder()
-                            .addRange(Value.Range.newBuilder()
-                                .setBegin(httpAddress.getPort())
-                                .setEnd(httpAddress.getPort()))
-                            .addRange(Value.Range.newBuilder()
-                                .setBegin(reportAddress.getPort())
-                                .setEnd(reportAddress.getPort()))))
-            .setExecutor(
-                ExecutorInfo
-                    .newBuilder()
-                    .setExecutorId(ExecutorID.newBuilder().setValue(
-                        "executor_" + taskId.getValue()))
-                    .setName("Hadoop TaskTracker")
-                    .setSource(taskId.getValue())
-                    .addResources(
-                        Resource
-                            .newBuilder()
-                            .setName("cpus")
-                            .setType(Value.Type.SCALAR)
-                            .setScalar(Value.Scalar.newBuilder().setValue(
-                                (TASKTRACKER_CPUS))))
-                    .addResources(
-                        Resource
-                            .newBuilder()
-                            .setName("mem")
-                            .setType(Value.Type.SCALAR)
-                            .setScalar(Value.Scalar.newBuilder().setValue(
-                                (TASKTRACKER_MEM)))).setCommand(commandInfo))
-            .build();
+                .newBuilder()
+                .setName("mem")
+                .setType(Value.Type.SCALAR)
+                .setScalar(Value.Scalar.newBuilder().setValue(
+                    (TASKTRACKER_MEM)))).setCommand(commandInfo))
+                    .build();
 
         driver.launchTasks(offer.getId(), Arrays.asList(info));
 
@@ -748,8 +752,8 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
     public long reduceSlots;
     public boolean active = false; // Set once tracked by the JobTracker.
 
-    // Tracks Hadoop tasks running on the tracker.
-    public Set<TaskAttemptID> hadoopTasks = new HashSet<TaskAttemptID>();
+    // Tracks Hadoop job tasks running on the tracker.
+    public Set<JobID> hadoopJobs = new HashSet<JobID>();
 
     public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
         long reduceSlots) {


[3/4] git commit: Kill tasks that never properly launch.

Posted by br...@apache.org.
Kill tasks that never properly launch.

After trying to launch a task tracker, we'll wait up to 5 minutes before
giving up and killing the task.

Review: https://reviews.apache.org/r/11124


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aac37e84
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aac37e84
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aac37e84

Branch: refs/heads/master
Commit: aac37e84013cbc15e14d211960f1414c0373c1cf
Parents: 5ef452c
Author: Brenden Matthews <br...@airbnb.com>
Authored: Mon May 13 16:16:12 2013 -0700
Committer: Brenden Matthews <br...@airbnb.com>
Committed: Tue Jul 30 12:48:05 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/MesosScheduler.java    | 41 ++++++++++++++++++--
 1 file changed, 37 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/aac37e84/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
index ca6106b..e4fbb80 100644
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -10,6 +10,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.commons.httpclient.HttpHost;
 import org.apache.commons.logging.Log;
@@ -71,6 +73,10 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
   private static final int MAP_SLOTS_DEFAULT = 2;
   private static final int REDUCE_SLOTS_DEFAULT = 2;
 
+  // The amount of time to wait for task trackers to launch before
+  // giving up.
+  private static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes
+
   // Count of the launched trackers for TaskID generation.
   private long launchedTrackers = 0;
 
@@ -143,6 +149,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
                     + mesosTracker.host);
 
                 driver.killTask(mesosTracker.taskId);
+		tracker.timer.cancel();
                 mesosTrackers.remove(tracker);
               }
             }
@@ -276,6 +283,11 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
     LOG.info("Re-registered with master " + masterInfo);
   }
 
+  public synchronized void killTracker(MesosTracker tracker) {
+    driver.killTask(tracker.taskId);
+    mesosTrackers.remove(tracker.host);
+  }
+
   // For some reason, pendingMaps() and pendingReduces() doesn't return the
   // values we expect. We observed negative values, which may be related to
   // https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the
@@ -346,6 +358,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
         HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
         if (mesosTrackers.containsKey(host)) {
           mesosTrackers.get(host).active = true;
+          mesosTrackers.get(host).timer.cancel();
           idleMapSlots += status.getAvailableMapSlots();
           idleReduceSlots += status.getAvailableReduceSlots();
         }
@@ -507,7 +520,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
 
         // Add this tracker to Mesos tasks.
         mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
-            mapSlots, reduceSlots));
+              mapSlots, reduceSlots, this));
 
         // Create the environment depending on whether the executor is going to be
         // run locally.
@@ -697,6 +710,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
         for (HttpHost tracker : trackers) {
           if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
             LOG.info("Removing terminated TaskTracker: " + tracker);
+	    tracker.timer.cancel();
             mesosTrackers.remove(tracker);
           }
         }
@@ -746,21 +760,40 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
    * Used to track the our launched TaskTrackers.
    */
   private class MesosTracker {
-    public HttpHost host;
+    public volatile HttpHost host;
     public TaskID taskId;
     public long mapSlots;
     public long reduceSlots;
-    public boolean active = false; // Set once tracked by the JobTracker.
+    public volatile boolean active = false; // Set once tracked by the JobTracker.
+    public Timer timer;
+    public volatile MesosScheduler scheduler;
 
     // Tracks Hadoop job tasks running on the tracker.
     public Set<JobID> hadoopJobs = new HashSet<JobID>();
 
     public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
-        long reduceSlots) {
+        long reduceSlots, MesosScheduler scheduler) {
       this.host = host;
       this.taskId = taskId;
       this.mapSlots = mapSlots;
       this.reduceSlots = reduceSlots;
+      this.scheduler = scheduler;
+
+      this.timer = new Timer();
+      timer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          synchronized (MesosTracker.this.scheduler) {
+            // If the tracker activated while we were awaiting to acquire the
+            // lock, return.
+            if (MesosTracker.this.active) return;
+
+            LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " +
+              LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it");
+            MesosTracker.this.scheduler.killTracker(MesosTracker.this);
+          }
+        }
+      }, LAUNCH_TIMEOUT_MS);
     }
   }
 }