You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by br...@apache.org on 2013/07/30 21:48:49 UTC
[1/4] git commit: Be more intelligent about slot allocation.
Updated Branches:
refs/heads/master aa2c15d10 -> 0605e9044
Be more intelligent about slot allocation.
When we get a resource offer, we'll try to be clever about how we
allocate slots to the offers. Ideally we want to use as much of the
resources available when we have a lot of pending tasks, and we also
don't want to use more resources than we actually need.
Review: https://reviews.apache.org/r/13078
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/97643616
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/97643616
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/97643616
Branch: refs/heads/master
Commit: 97643616ad222882cf43a9f1962de77390503d0a
Parents: aa2c15d
Author: Brenden Matthews <br...@airbnb.com>
Authored: Mon May 13 16:08:35 2013 -0700
Committer: Brenden Matthews <br...@airbnb.com>
Committed: Tue Jul 30 12:46:33 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/MesosScheduler.java | 167 ++++++++++++-------
1 file changed, 108 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/97643616/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
index 0c10874..11f972a 100644
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -359,8 +359,8 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
}
// Compute how many slots we need to allocate.
- int neededMapSlots = Math.max(0, pendingMaps - idleMapSlots);
- int neededReduceSlots = Math.max(0, pendingReduces - idleReduceSlots);
+ int neededMapSlots = Math.max(0, pendingMaps - (idleMapSlots + inactiveMapSlots));
+ int neededReduceSlots = Math.max(0, pendingReduces - (idleReduceSlots + inactiveReduceSlots));
LOG.info(join("\n", Arrays.asList(
"JobTracker Status",
@@ -413,10 +413,16 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
}
}
- int mapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum",
+ int mapSlotsMax = conf.getInt("mapred.tasktracker.map.tasks.maximum",
MAP_SLOTS_DEFAULT);
- int reduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
- REDUCE_SLOTS_DEFAULT);
+ int reduceSlotsMax =
+ conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
+ REDUCE_SLOTS_DEFAULT);
+
+ // What's the minimum number of map and reduce slots we should try to
+ // launch?
+ long mapSlots = 0;
+ long reduceSlots = 0;
double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
(float) SLOT_CPUS_DEFAULT);
@@ -426,39 +432,74 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
- // Total resource requirements for the container (TaskTracker + map/red
+ // Minimum resource requirements for the container (TaskTracker + map/red
// tasks).
- double containerCpus = (mapSlots + reduceSlots) * slotCpus
- + TASKTRACKER_CPUS;
- double containerMem = (mapSlots + reduceSlots) * slotMem
- + TASKTRACKER_MEM;
- double containerDisk = (mapSlots + reduceSlots) * slotDisk;
-
- if (containerCpus > cpus || containerMem > mem || containerDisk > disk
- || ports.size() < 2) {
+ double containerCpus = TASKTRACKER_CPUS;
+ double containerMem = TASKTRACKER_MEM;
+ double containerDisk = 0;
+
+ // Determine how many slots we can allocate.
+ int slots = mapSlotsMax + reduceSlotsMax;
+ slots = (int)Math.min(slots, (cpus - containerCpus) / slotCpus);
+ slots = (int)Math.min(slots, (mem - containerMem) / slotMem);
+ slots = (int)Math.min(slots, (disk - containerDisk) / slotDisk);
+
+ // Is this offer too small for even the minimum slots?
+ if (slots < 1 || ports.size() < 2) {
LOG.info(join("\n", Arrays.asList(
- "Declining offer with insufficient resources for a TaskTracker: ",
- " cpus: offered " + cpus + " needed " + containerCpus,
- " mem : offered " + mem + " needed " + containerMem,
- " disk: offered " + disk + " needed " + containerDisk,
- " ports: " + (ports.size() < 2
- ? " less than 2 offered"
- : " at least 2 (sufficient)"),
- offer.getResourcesList().toString())));
+ "Declining offer with insufficient resources for a TaskTracker: ",
+ " cpus: offered " + cpus + " needed " + containerCpus,
+ " mem : offered " + mem + " needed " + containerMem,
+ " disk: offered " + disk + " needed " + containerDisk,
+ " ports: " + (ports.size() < 2
+ ? " less than 2 offered"
+ : " at least 2 (sufficient)"),
+ offer.getResourcesList().toString())));
driver.declineOffer(offer.getId());
continue;
}
+ // Is the number of slots we need sufficiently small? If so, we can
+ // allocate exactly the number we need.
+ if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots <
+ mapSlotsMax && neededReduceSlots < reduceSlotsMax) {
+ mapSlots = neededMapSlots;
+ reduceSlots = neededReduceSlots;
+ } else {
+ // Allocate slots fairly for this resource offer.
+ double mapFactor = (double)neededMapSlots / (neededMapSlots + neededReduceSlots);
+ double reduceFactor = (double)neededReduceSlots / (neededMapSlots + neededReduceSlots);
+ // To avoid map/reduce slot starvation, don't allow more than 50%
+ // spread between map/reduce slots when we need both mappers and
+ // reducers.
+ if (neededMapSlots > 0 && neededReduceSlots > 0) {
+ if (mapFactor < 0.25) {
+ mapFactor = 0.25;
+ } else if (mapFactor > 0.75) {
+ mapFactor = 0.75;
+ }
+ if (reduceFactor < 0.25) {
+ reduceFactor = 0.25;
+ } else if (reduceFactor > 0.75) {
+ reduceFactor = 0.75;
+ }
+ }
+ mapSlots = Math.min(Math.min((long)(mapFactor * slots), mapSlotsMax), neededMapSlots);
+ // The remaining slots are allocated for reduces.
+ slots -= mapSlots;
+ reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots);
+ }
+
Integer[] portArray = ports.toArray(new Integer[2]);
HttpHost httpAddress = new HttpHost(offer.getHostname(), portArray[0]);
HttpHost reportAddress = new HttpHost(offer.getHostname(), portArray[1]);
TaskID taskId = TaskID.newBuilder()
- .setValue("Task_Tracker_" + launchedTrackers++).build();
+ .setValue("Task_Tracker_" + launchedTrackers++).build();
LOG.info("Launching task " + taskId.getValue() + " on "
- + httpAddress.toString());
+ + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots);
// Add this tracker to Mesos tasks.
mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
@@ -469,37 +510,45 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
// TODO(vinod): Do not pass the mapred config options as environment
// variables.
Protos.Environment.Builder envBuilder = Protos.Environment
- .newBuilder()
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.job.tracker")
- .setValue(jobTrackerAddress.getHostName() + ':'
- + jobTrackerAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.task.tracker.http.address")
- .setValue(
- httpAddress.getHostName() + ':' + httpAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.task.tracker.report.address")
- .setValue(reportAddress.getHostName() + ':'
- + reportAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.map.child.java.opts")
- .setValue("-Xmx" + slotJVMHeap + "m"))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.reduce.child.java.opts")
- .setValue("-Xmx" + slotJVMHeap + "m"))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("HADOOP_HEAPSIZE")
- .setValue("" + TASKTRACKER_JVM_HEAP));
+ .newBuilder()
+ .addVariables(
+ Protos.Environment.Variable
+ .newBuilder()
+ .setName("mapred.job.tracker")
+ .setValue(jobTrackerAddress.getHostName() + ':'
+ + jobTrackerAddress.getPort()))
+ .addVariables(
+ Protos.Environment.Variable
+ .newBuilder()
+ .setName("mapred.task.tracker.http.address")
+ .setValue(
+ httpAddress.getHostName() + ':' + httpAddress.getPort()))
+ .addVariables(
+ Protos.Environment.Variable
+ .newBuilder()
+ .setName("mapred.task.tracker.report.address")
+ .setValue(reportAddress.getHostName() + ':'
+ + reportAddress.getPort()))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.map.child.java.opts")
+ .setValue("-Xmx" + slotJVMHeap + "m"))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.reduce.child.java.opts")
+ .setValue("-Xmx" + slotJVMHeap + "m"))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("HADOOP_HEAPSIZE")
+ .setValue("" + TASKTRACKER_JVM_HEAP))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.tasktracker.map.tasks.maximum")
+ .setValue("" + mapSlots))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.tasktracker.reduce.tasks.maximum")
+ .setValue("" + reduceSlots));
// Set java specific environment, appropriately.
Map<String, String> env = System.getenv();
@@ -695,15 +744,15 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
private class MesosTracker {
public HttpHost host;
public TaskID taskId;
- public int mapSlots;
- public int reduceSlots;
+ public long mapSlots;
+ public long reduceSlots;
public boolean active = false; // Set once tracked by the JobTracker.
// Tracks Hadoop tasks running on the tracker.
public Set<TaskAttemptID> hadoopTasks = new HashSet<TaskAttemptID>();
- public MesosTracker(HttpHost host, TaskID taskId, int mapSlots,
- int reduceSlots) {
+ public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
+ long reduceSlots) {
this.host = host;
this.taskId = taskId;
this.mapSlots = mapSlots;
[4/4] git commit: Allowed some JVM memory vars to be configured.
Posted by br...@apache.org.
Allowed some JVM memory vars to be configured.
Also modified the slot CPU/mem values to be more sane defaults.
Review: https://reviews.apache.org/r/11130
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0605e904
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0605e904
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0605e904
Branch: refs/heads/master
Commit: 0605e90445beb34f621b369063d9bc452ff8cd39
Parents: aac37e8
Author: Brenden Matthews <br...@airbnb.com>
Authored: Fri May 10 14:34:22 2013 -0700
Committer: Brenden Matthews <br...@airbnb.com>
Committed: Tue Jul 30 12:48:29 2013 -0700
----------------------------------------------------------------------
hadoop/mapred-site.xml.patch | 6 ++---
.../apache/hadoop/mapred/MesosScheduler.java | 27 ++++++++++++--------
2 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0605e904/hadoop/mapred-site.xml.patch
----------------------------------------------------------------------
diff --git a/hadoop/mapred-site.xml.patch b/hadoop/mapred-site.xml.patch
index 8b39979..d5a99f6 100644
--- a/hadoop/mapred-site.xml.patch
+++ b/hadoop/mapred-site.xml.patch
@@ -37,7 +37,7 @@ index 970c8fe..f9f272d 100644
+# that are allocated to a Hadoop slot (i.e., map/reduce task) by Mesos.
+ <property>
+ <name>mapred.mesos.slot.cpus</name>
-+ <value>0.2</value>
++ <value>1</value>
+ </property>
+ <property>
+ <name>mapred.mesos.slot.disk</name>
@@ -47,8 +47,8 @@ index 970c8fe..f9f272d 100644
+ <property>
+ <name>mapred.mesos.slot.mem</name>
+ <!-- Note that this is the total memory required for
-+ JVM overhead (256 MB) and the heap (-Xmx) of the task.
++ JVM overhead (10% of this value) and the heap (-Xmx) of the task.
+ The value is in MB. -->
-+ <value>512</value>
++ <value>1024</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/mesos/blob/0605e904/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
index e4fbb80..69d4655 100644
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -50,7 +50,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
// This is the memory overhead for a jvm process. This needs to be added
// to a jvm process's resource requirement, in addition to its heap size.
- private static final int JVM_MEM_OVERHEAD = 256; // 256 MB.
+ private static final double JVM_MEM_OVERHEAD_PERCENT_DEFAULT = 0.1; // 10%.
// TODO(vinod): Consider parsing the slot memory from the configuration jvm
// heap options (e.g: mapred.child.java.opts).
@@ -62,12 +62,10 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
// 1 GB of disk space.
private static final double SLOT_CPUS_DEFAULT = 0.2; // 0.2 cores.
private static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
- private static final int SLOT_JVM_HEAP_DEFAULT = 256; // MB.
+ private static final int SLOT_JVM_HEAP_DEFAULT = 256; // 256MB.
private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
- private static final int TASKTRACKER_JVM_HEAP = 1024; // 1 GB.
- private static final int TASKTRACKER_MEM =
- TASKTRACKER_JVM_HEAP + JVM_MEM_OVERHEAD;
+ private static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB.
// The default behavior in Hadoop is to use 4 slots per TaskTracker:
private static final int MAP_SLOTS_DEFAULT = 2;
@@ -445,14 +443,21 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
(float) SLOT_CPUS_DEFAULT);
double slotDisk = conf.getInt("mapred.mesos.slot.disk",
SLOT_DISK_DEFAULT);
- double slotMem = conf.getInt("mapred.mesos.slot.mem",
- SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
- double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
+
+ int slotMem = conf.getInt("mapred.mesos.slot.mem",
+ SLOT_JVM_HEAP_DEFAULT);
+ long slotJVMHeap = Math.round((double)slotMem -
+ (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * slotMem));
+
+ int tasktrackerMem = conf.getInt("mapred.mesos.tasktracker.mem",
+ TASKTRACKER_MEM_DEFAULT);
+ long tasktrackerJVMHeap = Math.round((double)tasktrackerMem -
+ (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * tasktrackerMem));
// Minimum resource requirements for the container (TaskTracker + map/red
// tasks).
double containerCpus = TASKTRACKER_CPUS;
- double containerMem = TASKTRACKER_MEM;
+ double containerMem = tasktrackerMem;
double containerDisk = 0;
// Determine how many slots we can allocate.
@@ -557,7 +562,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
.addVariables(
Protos.Environment.Variable.newBuilder()
.setName("HADOOP_HEAPSIZE")
- .setValue("" + TASKTRACKER_JVM_HEAP))
+ .setValue("" + tasktrackerJVMHeap))
.addVariables(
Protos.Environment.Variable.newBuilder()
.setName("mapred.tasktracker.map.tasks.maximum")
@@ -663,7 +668,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(
- (TASKTRACKER_MEM)))).setCommand(commandInfo))
+ (tasktrackerMem)))).setCommand(commandInfo))
.build();
driver.launchTasks(offer.getId(), Arrays.asList(info));
[2/4] git commit: Don't prematurely kill TaskTrackers.
Posted by br...@apache.org.
Don't prematurely kill TaskTrackers.
We assign the JobID to the internal Mesos tracker (for the TaskTracker)
rather than the TaskID.
In the case where a TaskTracker has only map tasks assigned to it and
they have all completed, we mustn't terminate the tracker until the
entire job has finished, including the map tasks.
Review: https://reviews.apache.org/r/11119
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5ef452cb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5ef452cb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5ef452cb
Branch: refs/heads/master
Commit: 5ef452cb4b1cef1df94592f7e637644140f9fda4
Parents: 9764361
Author: Brenden Matthews <br...@airbnb.com>
Authored: Mon May 13 16:13:16 2013 -0700
Committer: Brenden Matthews <br...@airbnb.com>
Committed: Tue Jul 30 12:47:37 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/MesosScheduler.java | 180 ++++++++++---------
1 file changed, 92 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5ef452cb/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
index 11f972a..ca6106b 100644
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -65,7 +65,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
private static final int TASKTRACKER_JVM_HEAP = 1024; // 1 GB.
private static final int TASKTRACKER_MEM =
- TASKTRACKER_JVM_HEAP + JVM_MEM_OVERHEAD;
+ TASKTRACKER_JVM_HEAP + JVM_MEM_OVERHEAD;
// The default behavior in Hadoop is to use 4 slots per TaskTracker:
private static final int MAP_SLOTS_DEFAULT = 2;
@@ -78,7 +78,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
// Used for tracking the slots of each TaskTracker and the corresponding
// Mesos TaskID.
private Map<HttpHost, MesosTracker> mesosTrackers =
- new HashMap<HttpHost, MesosTracker>();
+ new HashMap<HttpHost, MesosTracker>();
private JobInProgressListener jobListener = new JobInProgressListener() {
@Override
@@ -97,8 +97,9 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
JobInProgress job = event.getJobInProgress();
// If the job is complete, kill all the corresponding idle TaskTrackers.
- if (!job.isComplete())
+ if (!job.isComplete()) {
return;
+ }
LOG.info("Completed job : " + job.getJobID());
@@ -111,10 +112,12 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
completed.addAll(job.reportTasksInProgress(false, true));
for (TaskInProgress task : completed) {
- for (TaskStatus status : task.getTaskStatuses()) {
- LOG.info("Removing completed task : " + status.getTaskID()
- + " of tracker " + status.getTaskTracker());
+ // Check that this task actually belongs to this job
+ if (task.getJob().getJobID() != job.getJobID()) {
+ continue;
+ }
+ for (TaskStatus status : task.getTaskStatuses()) {
// Make a copy to iterate over keys and delete values.
Set<HttpHost> trackers = new HashSet<HttpHost>(
mesosTrackers.keySet());
@@ -129,12 +132,15 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
continue;
}
- mesosTracker.hadoopTasks.remove(status.getTaskID());
+ LOG.info("Removing completed task : " + status.getTaskID()
+ + " of tracker " + status.getTaskTracker());
- // If this TaskTracker doesn't have any running tasks, kill it.
- if (mesosTracker.hadoopTasks.isEmpty()) {
- LOG.info("Killing Mesos task: " + mesosTracker.taskId
- + " on host " + mesosTracker.host);
+ mesosTracker.hadoopJobs.remove(job.getJobID());
+
+ // If the TaskTracker doesn't have any running tasks, kill it.
+ if (mesosTracker.hadoopJobs.isEmpty()) {
+ LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
+ + mesosTracker.host);
driver.killTask(mesosTracker.taskId);
mesosTrackers.remove(tracker);
@@ -157,7 +163,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
try {
taskScheduler =
- (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
+ (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
taskScheduler.setConf(conf);
taskScheduler.setTaskTrackerManager(taskTrackerManager);
} catch (ClassNotFoundException e) {
@@ -181,10 +187,10 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
try {
FrameworkInfo frameworkInfo = FrameworkInfo
- .newBuilder()
- .setUser("")
- .setName("Hadoop: (RPC port: " + jobTracker.port + ","
- + " WebUI port: " + jobTracker.infoPort + ")").build();
+ .newBuilder()
+ .setUser("")
+ .setName("Hadoop: (RPC port: " + jobTracker.port + ","
+ + " WebUI port: " + jobTracker.infoPort + ")").build();
driver = new MesosSchedulerDriver(this, frameworkInfo, master);
driver.start();
@@ -212,7 +218,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
@Override
public synchronized List<Task> assignTasks(TaskTracker taskTracker)
- throws IOException {
+ throws IOException {
HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
taskTracker.getStatus().getHttpPort());
@@ -229,9 +235,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
if (tasks != null) {
// Keep track of which TaskTracker contains which tasks.
for (Task task : tasks) {
- LOG.info("Assigning task : " + task.getTaskID() + " to tracker "
- + tracker);
- mesosTrackers.get(tracker).hadoopTasks.add(task.getTaskID());
+ mesosTrackers.get(tracker).hadoopJobs.add(task.getJobID());
}
}
@@ -286,7 +290,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
for (int i = 0; i < totalTasks; ++i) {
TaskInProgress task = tasks[i];
if (task == null) {
- continue;
+ continue;
}
if (task.isComplete()) {
finishedTasks += 1;
@@ -571,9 +575,9 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
if (master.equals("local")) {
try {
commandInfo = CommandInfo.newBuilder()
- .setEnvironment(envBuilder)
- .setValue(new File("bin/mesos-executor").getCanonicalPath())
- .build();
+ .setEnvironment(envBuilder)
+ .setValue(new File("bin/mesos-executor").getCanonicalPath())
+ .build();
} catch (IOException e) {
LOG.fatal("Failed to find Mesos executor ", e);
System.exit(1);
@@ -581,73 +585,73 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
} else {
String uri = conf.get("mapred.mesos.executor");
commandInfo = CommandInfo.newBuilder()
- .setEnvironment(envBuilder)
- .setValue("cd hadoop-* && ./bin/mesos-executor")
- .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
+ .setEnvironment(envBuilder)
+ .setValue("cd hadoop-* && ./bin/mesos-executor")
+ .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
}
TaskInfo info = TaskInfo
- .newBuilder()
- .setName(taskId.getValue())
- .setTaskId(taskId)
- .setSlaveId(offer.getSlaveId())
- .addResources(
- Resource
- .newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotCpus)))
- .addResources(
- Resource
- .newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotMem)))
- .addResources(
+ .newBuilder()
+ .setName(taskId.getValue())
+ .setTaskId(taskId)
+ .setSlaveId(offer.getSlaveId())
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (mapSlots + reduceSlots) * slotCpus)))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (mapSlots + reduceSlots) * slotMem)))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("disk")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (mapSlots + reduceSlots) * slotDisk)))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("ports")
+ .setType(Value.Type.RANGES)
+ .setRanges(
+ Value.Ranges
+ .newBuilder()
+ .addRange(Value.Range.newBuilder()
+ .setBegin(httpAddress.getPort())
+ .setEnd(httpAddress.getPort()))
+ .addRange(Value.Range.newBuilder()
+ .setBegin(reportAddress.getPort())
+ .setEnd(reportAddress.getPort()))))
+ .setExecutor(
+ ExecutorInfo
+ .newBuilder()
+ .setExecutorId(ExecutorID.newBuilder().setValue(
+ "executor_" + taskId.getValue()))
+ .setName("Hadoop TaskTracker")
+ .setSource(taskId.getValue())
+ .addResources(
Resource
- .newBuilder()
- .setName("disk")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotDisk)))
- .addResources(
+ .newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (TASKTRACKER_CPUS))))
+ .addResources(
Resource
- .newBuilder()
- .setName("ports")
- .setType(Value.Type.RANGES)
- .setRanges(
- Value.Ranges
- .newBuilder()
- .addRange(Value.Range.newBuilder()
- .setBegin(httpAddress.getPort())
- .setEnd(httpAddress.getPort()))
- .addRange(Value.Range.newBuilder()
- .setBegin(reportAddress.getPort())
- .setEnd(reportAddress.getPort()))))
- .setExecutor(
- ExecutorInfo
- .newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue(
- "executor_" + taskId.getValue()))
- .setName("Hadoop TaskTracker")
- .setSource(taskId.getValue())
- .addResources(
- Resource
- .newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (TASKTRACKER_CPUS))))
- .addResources(
- Resource
- .newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (TASKTRACKER_MEM)))).setCommand(commandInfo))
- .build();
+ .newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (TASKTRACKER_MEM)))).setCommand(commandInfo))
+ .build();
driver.launchTasks(offer.getId(), Arrays.asList(info));
@@ -748,8 +752,8 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
public long reduceSlots;
public boolean active = false; // Set once tracked by the JobTracker.
- // Tracks Hadoop tasks running on the tracker.
- public Set<TaskAttemptID> hadoopTasks = new HashSet<TaskAttemptID>();
+ // Tracks Hadoop job tasks running on the tracker.
+ public Set<JobID> hadoopJobs = new HashSet<JobID>();
public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
long reduceSlots) {
[3/4] git commit: Kill tasks that never properly launch.
Posted by br...@apache.org.
Kill tasks that never properly launch.
After trying to launch a task tracker, we'll wait up to 5 minutes before
giving up and killing the task.
Review: https://reviews.apache.org/r/11124
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aac37e84
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aac37e84
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aac37e84
Branch: refs/heads/master
Commit: aac37e84013cbc15e14d211960f1414c0373c1cf
Parents: 5ef452c
Author: Brenden Matthews <br...@airbnb.com>
Authored: Mon May 13 16:16:12 2013 -0700
Committer: Brenden Matthews <br...@airbnb.com>
Committed: Tue Jul 30 12:48:05 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/MesosScheduler.java | 41 ++++++++++++++++++--
1 file changed, 37 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/aac37e84/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
index ca6106b..e4fbb80 100644
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -10,6 +10,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.commons.httpclient.HttpHost;
import org.apache.commons.logging.Log;
@@ -71,6 +73,10 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
private static final int MAP_SLOTS_DEFAULT = 2;
private static final int REDUCE_SLOTS_DEFAULT = 2;
+ // The amount of time to wait for task trackers to launch before
+ // giving up.
+ private static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes
+
// Count of the launched trackers for TaskID generation.
private long launchedTrackers = 0;
@@ -143,6 +149,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
+ mesosTracker.host);
driver.killTask(mesosTracker.taskId);
+ tracker.timer.cancel();
mesosTrackers.remove(tracker);
}
}
@@ -276,6 +283,11 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
LOG.info("Re-registered with master " + masterInfo);
}
+ public synchronized void killTracker(MesosTracker tracker) {
+ driver.killTask(tracker.taskId);
+ mesosTrackers.remove(tracker.host);
+ }
+
// For some reason, pendingMaps() and pendingReduces() doesn't return the
// values we expect. We observed negative values, which may be related to
// https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the
@@ -346,6 +358,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
if (mesosTrackers.containsKey(host)) {
mesosTrackers.get(host).active = true;
+ mesosTrackers.get(host).timer.cancel();
idleMapSlots += status.getAvailableMapSlots();
idleReduceSlots += status.getAvailableReduceSlots();
}
@@ -507,7 +520,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
// Add this tracker to Mesos tasks.
mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
- mapSlots, reduceSlots));
+ mapSlots, reduceSlots, this));
// Create the environment depending on whether the executor is going to be
// run locally.
@@ -697,6 +710,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
for (HttpHost tracker : trackers) {
if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
LOG.info("Removing terminated TaskTracker: " + tracker);
+ tracker.timer.cancel();
mesosTrackers.remove(tracker);
}
}
@@ -746,21 +760,40 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
* Used to track the our launched TaskTrackers.
*/
private class MesosTracker {
- public HttpHost host;
+ public volatile HttpHost host;
public TaskID taskId;
public long mapSlots;
public long reduceSlots;
- public boolean active = false; // Set once tracked by the JobTracker.
+ public volatile boolean active = false; // Set once tracked by the JobTracker.
+ public Timer timer;
+ public volatile MesosScheduler scheduler;
// Tracks Hadoop job tasks running on the tracker.
public Set<JobID> hadoopJobs = new HashSet<JobID>();
public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
- long reduceSlots) {
+ long reduceSlots, MesosScheduler scheduler) {
this.host = host;
this.taskId = taskId;
this.mapSlots = mapSlots;
this.reduceSlots = reduceSlots;
+ this.scheduler = scheduler;
+
+ this.timer = new Timer();
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (MesosTracker.this.scheduler) {
+ // If the tracker activated while we were awaiting to acquire the
+ // lock, return.
+ if (MesosTracker.this.active) return;
+
+ LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " +
+ LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it");
+ MesosTracker.this.scheduler.killTracker(MesosTracker.this);
+ }
+ }
+ }, LAUNCH_TIMEOUT_MS);
}
}
}