You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/04/10 01:46:43 UTC
svn commit: r1466292 -
/incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
Author: bmahler
Date: Tue Apr 9 23:46:43 2013
New Revision: 1466292
URL: http://svn.apache.org/r1466292
Log:
Fixed a deadlock in the Hadoop MesosScheduler by unsynchronizing resourceOffers().
Review: https://reviews.apache.org/r/10352
Modified:
incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
Modified: incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java?rev=1466292&r1=1466291&r2=1466292&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java (original)
+++ incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java Tue Apr 9 23:46:43 2013
@@ -145,9 +145,9 @@ public class MesosScheduler extends Task
}
};
- public MesosScheduler() {
- }
+ public MesosScheduler() {}
+ // TaskScheduler methods.
@Override
public synchronized void start() throws IOException {
conf = getConf();
@@ -209,7 +209,6 @@ public class MesosScheduler extends Task
taskScheduler.terminate();
}
- // TaskScheduler methods.
@Override
public synchronized List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
@@ -246,6 +245,16 @@ public class MesosScheduler extends Task
}
// Mesos Scheduler methods.
+ // These are synchronized, where possible. Some of these methods need to access the
+ // JobTracker, which can lead to a deadlock:
+ // See: https://issues.apache.org/jira/browse/MESOS-429
+ // The workaround employed here is to unsynchronize those methods needing access to
+ // the JobTracker state and use explicit synchronization instead as appropriate.
+ // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to
+ // split up the Scheduler and TaskScheduler implementations in order to break the
+ // locking cycle. This would require a synchronized class to store the shared
+ // state across our Scheduler and TaskScheduler implementations, and provide
+ // atomic operations as needed.
@Override
public synchronized void registered(SchedulerDriver schedulerDriver,
FrameworkID frameworkID, MasterInfo masterInfo) {
@@ -259,297 +268,309 @@ public class MesosScheduler extends Task
LOG.info("Re-registered with master " + masterInfo);
}
+ // This method uses explicit synchronization in order to avoid deadlocks when
+ // accessing the JobTracker.
@Override
- public synchronized void resourceOffers(SchedulerDriver schedulerDriver,
+ public void resourceOffers(SchedulerDriver schedulerDriver,
List<Offer> offers) {
- // Compute the number of pending maps and reduces.
- int pendingMaps = 0;
- int pendingReduces = 0;
+ // Before synchronizing, we pull all needed information from the JobTracker.
+ final HttpHost jobTrackerAddress = new HttpHost(jobTracker.getHostname(),
+ jobTracker.getTrackerPort());
+
+ final Collection<TaskTrackerStatus> taskTrackers = jobTracker.taskTrackers();
+
+ final List<JobInProgress> jobsInProgress = new ArrayList<JobInProgress>();
for (JobStatus status : jobTracker.jobsToComplete()) {
- JobInProgress progress = jobTracker.getJob(status.getJobID());
- pendingMaps += progress.pendingMaps();
- pendingReduces += progress.pendingReduces();
+ jobsInProgress.add(jobTracker.getJob(status.getJobID()));
}
- // Mark active (heartbeated) TaskTrackers and compute idle slots.
- int idleMapSlots = 0;
- int idleReduceSlots = 0;
- for (TaskTrackerStatus status : jobTracker.taskTrackers()) {
- HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
- if (mesosTrackers.containsKey(host)) {
- mesosTrackers.get(host).active = true;
- idleMapSlots += status.getAvailableMapSlots();
- idleReduceSlots += status.getAvailableReduceSlots();
+ synchronized (this) {
+ // Compute the number of pending maps and reduces.
+ int pendingMaps = 0;
+ int pendingReduces = 0;
+ for (JobInProgress progress : jobsInProgress) {
+ pendingMaps += progress.pendingMaps();
+ pendingReduces += progress.pendingReduces();
}
- }
- // Consider the TaskTrackers that have yet to become active as being idle,
- // otherwise we will launch excessive TaskTrackers.
- int inactiveMapSlots = 0;
- int inactiveReduceSlots = 0;
- for (MesosTracker tracker : mesosTrackers.values()) {
- if (!tracker.active) {
- inactiveMapSlots += tracker.mapSlots;
- inactiveReduceSlots += tracker.reduceSlots;
+ // Mark active (heartbeated) TaskTrackers and compute idle slots.
+ int idleMapSlots = 0;
+ int idleReduceSlots = 0;
+ for (TaskTrackerStatus status : taskTrackers) {
+ HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
+ if (mesosTrackers.containsKey(host)) {
+ mesosTrackers.get(host).active = true;
+ idleMapSlots += status.getAvailableMapSlots();
+ idleReduceSlots += status.getAvailableReduceSlots();
+ }
}
- }
- // Compute how many slots we need to allocate.
- int neededMapSlots = Math.max(0, pendingMaps - idleMapSlots);
- int neededReduceSlots = Math.max(0, pendingReduces - idleReduceSlots);
-
- LOG.info(join("\n", Arrays.asList(
- "JobTracker Status",
- " Pending Map Tasks: " + pendingMaps,
- " Pending Reduce Tasks: " + pendingReduces,
- " Idle Map Slots: " + idleMapSlots,
- " Idle Reduce Slots: " + idleReduceSlots,
- " Inactive Map Slots: " + inactiveMapSlots
- + " (launched but no hearbeat yet)",
- " Inactive Reduce Slots: " + inactiveReduceSlots
- + " (launched but no hearbeat yet)",
- " Needed Map Slots: " + neededMapSlots,
- " Needed Reduce Slots: " + neededReduceSlots)));
-
- // Launch TaskTrackers to satisfy the slot requirements.
- // TODO(bmahler): Consider slotting intelligently.
- // Ex: If more map slots are needed, but no reduce slots are needed,
- // launch a map-only TaskTracker to better satisfy the slot needs.
- for (Offer offer : offers) {
- if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
- driver.declineOffer(offer.getId());
- continue;
+ // Consider the TaskTrackers that have yet to become active as being idle,
+ // otherwise we will launch excessive TaskTrackers.
+ int inactiveMapSlots = 0;
+ int inactiveReduceSlots = 0;
+ for (MesosTracker tracker : mesosTrackers.values()) {
+ if (!tracker.active) {
+ inactiveMapSlots += tracker.mapSlots;
+ inactiveReduceSlots += tracker.reduceSlots;
+ }
}
- double cpus = -1.0;
- double mem = -1.0;
- double disk = -1.0;
- Set<Integer> ports = new HashSet<Integer>(2);
-
- // Pull out the cpus, memory, disk, and 2 ports from the offer.
- for (Resource resource : offer.getResourcesList()) {
- if (resource.getName().equals("cpus")
- && resource.getType() == Value.Type.SCALAR) {
- cpus = resource.getScalar().getValue();
- } else if (resource.getName().equals("mem")
- && resource.getType() == Value.Type.SCALAR) {
- mem = resource.getScalar().getValue();
- } else if (resource.getName().equals("disk")
- && resource.getType() == Value.Type.SCALAR) {
- disk = resource.getScalar().getValue();
- } else if (resource.getName().equals("ports")
- && resource.getType() == Value.Type.RANGES) {
- for (Value.Range range : resource.getRanges().getRangeList()) {
- if (ports.size() < 2)
- ports.add((int) range.getBegin());
- if (ports.size() < 2)
- ports.add((int) range.getEnd());
+ // Compute how many slots we need to allocate.
+ int neededMapSlots = Math.max(0, pendingMaps - idleMapSlots);
+ int neededReduceSlots = Math.max(0, pendingReduces - idleReduceSlots);
+
+ LOG.info(join("\n", Arrays.asList(
+ "JobTracker Status",
+ " Pending Map Tasks: " + pendingMaps,
+ " Pending Reduce Tasks: " + pendingReduces,
+ " Idle Map Slots: " + idleMapSlots,
+ " Idle Reduce Slots: " + idleReduceSlots,
+ " Inactive Map Slots: " + inactiveMapSlots
+ + " (launched but no hearbeat yet)",
+ " Inactive Reduce Slots: " + inactiveReduceSlots
+ + " (launched but no hearbeat yet)",
+ " Needed Map Slots: " + neededMapSlots,
+ " Needed Reduce Slots: " + neededReduceSlots)));
+
+ // Launch TaskTrackers to satisfy the slot requirements.
+ // TODO(bmahler): Consider slotting intelligently.
+ // Ex: If more map slots are needed, but no reduce slots are needed,
+ // launch a map-only TaskTracker to better satisfy the slot needs.
+ for (Offer offer : offers) {
+ if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+ driver.declineOffer(offer.getId());
+ continue;
+ }
+
+ double cpus = -1.0;
+ double mem = -1.0;
+ double disk = -1.0;
+ Set<Integer> ports = new HashSet<Integer>(2);
+
+ // Pull out the cpus, memory, disk, and 2 ports from the offer.
+ for (Resource resource : offer.getResourcesList()) {
+ if (resource.getName().equals("cpus")
+ && resource.getType() == Value.Type.SCALAR) {
+ cpus = resource.getScalar().getValue();
+ } else if (resource.getName().equals("mem")
+ && resource.getType() == Value.Type.SCALAR) {
+ mem = resource.getScalar().getValue();
+ } else if (resource.getName().equals("disk")
+ && resource.getType() == Value.Type.SCALAR) {
+ disk = resource.getScalar().getValue();
+ } else if (resource.getName().equals("ports")
+ && resource.getType() == Value.Type.RANGES) {
+ for (Value.Range range : resource.getRanges().getRangeList()) {
+ if (ports.size() < 2)
+ ports.add((int) range.getBegin());
+ if (ports.size() < 2)
+ ports.add((int) range.getEnd());
+ }
}
}
- }
- int mapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum",
- MAP_SLOTS_DEFAULT);
- int reduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
- REDUCE_SLOTS_DEFAULT);
-
- double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
- (float) SLOT_CPUS_DEFAULT);
- double slotDisk = conf.getInt("mapred.mesos.slot.disk",
- SLOT_DISK_DEFAULT);
- double slotMem = conf.getInt("mapred.mesos.slot.mem",
- SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
- double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
-
- // Total resource requirements for the container (TaskTracker + map/red
- // tasks).
- double containerCpus = (mapSlots + reduceSlots) * slotCpus
- + TASKTRACKER_CPUS;
- double containerMem = (mapSlots + reduceSlots) * slotMem
- + TASKTRACKER_MEM;
- double containerDisk = (mapSlots + reduceSlots) * slotDisk;
-
- if (containerCpus > cpus || containerMem > mem || containerDisk > disk
- || ports.size() < 2) {
- LOG.info(join("\n", Arrays.asList(
- "Declining offer with insufficient resources for a TaskTracker: ",
- " cpus: offered " + cpus + " needed " + containerCpus,
- " mem : offered " + mem + " needed " + containerMem,
- " disk: offered " + disk + " needed " + containerDisk,
- " ports: " + (ports.size() < 2
- ? " less than 2 offered"
- : " at least 2 (sufficient)"),
- offer.getResourcesList().toString())));
+ int mapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum",
+ MAP_SLOTS_DEFAULT);
+ int reduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
+ REDUCE_SLOTS_DEFAULT);
+
+ double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
+ (float) SLOT_CPUS_DEFAULT);
+ double slotDisk = conf.getInt("mapred.mesos.slot.disk",
+ SLOT_DISK_DEFAULT);
+ double slotMem = conf.getInt("mapred.mesos.slot.mem",
+ SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
+ double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
+
+ // Total resource requirements for the container (TaskTracker + map/red
+ // tasks).
+ double containerCpus = (mapSlots + reduceSlots) * slotCpus
+ + TASKTRACKER_CPUS;
+ double containerMem = (mapSlots + reduceSlots) * slotMem
+ + TASKTRACKER_MEM;
+ double containerDisk = (mapSlots + reduceSlots) * slotDisk;
+
+ if (containerCpus > cpus || containerMem > mem || containerDisk > disk
+ || ports.size() < 2) {
+ LOG.info(join("\n", Arrays.asList(
+ "Declining offer with insufficient resources for a TaskTracker: ",
+ " cpus: offered " + cpus + " needed " + containerCpus,
+ " mem : offered " + mem + " needed " + containerMem,
+ " disk: offered " + disk + " needed " + containerDisk,
+ " ports: " + (ports.size() < 2
+ ? " less than 2 offered"
+ : " at least 2 (sufficient)"),
+ offer.getResourcesList().toString())));
- driver.declineOffer(offer.getId());
- continue;
- }
-
- Integer[] portArray = ports.toArray(new Integer[2]);
- HttpHost httpAddress = new HttpHost(offer.getHostname(), portArray[0]);
- HttpHost reportAddress = new HttpHost(offer.getHostname(), portArray[1]);
- HttpHost jobTrackerAddress = new HttpHost(jobTracker.getHostname(),
- jobTracker.getTrackerPort());
-
- TaskID taskId = TaskID.newBuilder()
- .setValue("Task_Tracker_" + launchedTrackers++).build();
-
- LOG.info("Launching task " + taskId.getValue() + " on "
- + httpAddress.toString());
-
- // Add this tracker to Mesos tasks.
- mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
- mapSlots, reduceSlots));
-
- // Create the environment depending on whether the executor is going to be
- // run locally.
- // TODO(vinod): Do not pass the mapred config options as environment
- // variables.
- Protos.Environment.Builder envBuilder = Protos.Environment
- .newBuilder()
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.job.tracker")
- .setValue(jobTrackerAddress.getHostName() + ':'
- + jobTrackerAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.task.tracker.http.address")
- .setValue(
- httpAddress.getHostName() + ':' + httpAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.task.tracker.report.address")
- .setValue(reportAddress.getHostName() + ':'
- + reportAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.map.child.java.opts")
- .setValue("-Xmx" + slotJVMHeap + "m"))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.reduce.child.java.opts")
- .setValue("-Xmx" + slotJVMHeap + "m"))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("HADOOP_HEAPSIZE")
- .setValue("" + TASKTRACKER_JVM_HEAP));
-
- // Set java specific environment, appropriately.
- Map<String, String> env = System.getenv();
- if (env.containsKey("JAVA_HOME")) {
- envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
- .setName("JAVA_HOME")
- .setValue(env.get("JAVA_HOME")));
- }
+ driver.declineOffer(offer.getId());
+ continue;
+ }
- if (env.containsKey("JAVA_LIBRARY_PATH")) {
- envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
- .setName("JAVA_LIBRARY_PATH")
- .setValue(env.get("JAVA_LIBRARY_PATH")));
- }
+ Integer[] portArray = ports.toArray(new Integer[2]);
+ HttpHost httpAddress = new HttpHost(offer.getHostname(), portArray[0]);
+ HttpHost reportAddress = new HttpHost(offer.getHostname(), portArray[1]);
+
+ TaskID taskId = TaskID.newBuilder()
+ .setValue("Task_Tracker_" + launchedTrackers++).build();
+
+ LOG.info("Launching task " + taskId.getValue() + " on "
+ + httpAddress.toString());
+
+ // Add this tracker to Mesos tasks.
+ mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
+ mapSlots, reduceSlots));
+
+ // Create the environment depending on whether the executor is going to be
+ // run locally.
+ // TODO(vinod): Do not pass the mapred config options as environment
+ // variables.
+ Protos.Environment.Builder envBuilder = Protos.Environment
+ .newBuilder()
+ .addVariables(
+ Protos.Environment.Variable
+ .newBuilder()
+ .setName("mapred.job.tracker")
+ .setValue(jobTrackerAddress.getHostName() + ':'
+ + jobTrackerAddress.getPort()))
+ .addVariables(
+ Protos.Environment.Variable
+ .newBuilder()
+ .setName("mapred.task.tracker.http.address")
+ .setValue(
+ httpAddress.getHostName() + ':' + httpAddress.getPort()))
+ .addVariables(
+ Protos.Environment.Variable
+ .newBuilder()
+ .setName("mapred.task.tracker.report.address")
+ .setValue(reportAddress.getHostName() + ':'
+ + reportAddress.getPort()))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.map.child.java.opts")
+ .setValue("-Xmx" + slotJVMHeap + "m"))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.reduce.child.java.opts")
+ .setValue("-Xmx" + slotJVMHeap + "m"))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("HADOOP_HEAPSIZE")
+ .setValue("" + TASKTRACKER_JVM_HEAP));
+
+ // Set java specific environment, appropriately.
+ Map<String, String> env = System.getenv();
+ if (env.containsKey("JAVA_HOME")) {
+ envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+ .setName("JAVA_HOME")
+ .setValue(env.get("JAVA_HOME")));
+ }
- // Command info differs when performing a local run.
- CommandInfo commandInfo = null;
- String master = conf.get("mapred.mesos.master", "local");
+ if (env.containsKey("JAVA_LIBRARY_PATH")) {
+ envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+ .setName("JAVA_LIBRARY_PATH")
+ .setValue(env.get("JAVA_LIBRARY_PATH")));
+ }
- if (master.equals("local")) {
- try {
+ // Command info differs when performing a local run.
+ CommandInfo commandInfo = null;
+ String master = conf.get("mapred.mesos.master", "local");
+
+ if (master.equals("local")) {
+ try {
+ commandInfo = CommandInfo.newBuilder()
+ .setEnvironment(envBuilder)
+ .setValue(new File("bin/mesos-executor").getCanonicalPath())
+ .build();
+ } catch (IOException e) {
+ LOG.fatal("Failed to find Mesos executor ", e);
+ System.exit(1);
+ }
+ } else {
+ String uri = conf.get("mapred.mesos.executor");
commandInfo = CommandInfo.newBuilder()
.setEnvironment(envBuilder)
- .setValue(new File("bin/mesos-executor").getCanonicalPath())
- .build();
- } catch (IOException e) {
- LOG.fatal("Failed to find Mesos executor ", e);
- System.exit(1);
+ .setValue("cd hadoop && ./bin/mesos-executor")
+ .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
}
- } else {
- String uri = conf.get("mapred.mesos.executor");
- commandInfo = CommandInfo.newBuilder()
- .setEnvironment(envBuilder)
- .setValue("cd hadoop && ./bin/mesos-executor")
- .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
- }
- TaskInfo info = TaskInfo
- .newBuilder()
- .setName(taskId.getValue())
- .setTaskId(taskId)
- .setSlaveId(offer.getSlaveId())
- .addResources(
- Resource
- .newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotCpus)))
- .addResources(
- Resource
- .newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotMem)))
- .addResources(
- Resource
- .newBuilder()
- .setName("disk")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotDisk)))
- .addResources(
- Resource
- .newBuilder()
- .setName("ports")
- .setType(Value.Type.RANGES)
- .setRanges(
- Value.Ranges
- .newBuilder()
- .addRange(Value.Range.newBuilder()
- .setBegin(httpAddress.getPort())
- .setEnd(httpAddress.getPort()))
- .addRange(Value.Range.newBuilder()
- .setBegin(reportAddress.getPort())
- .setEnd(reportAddress.getPort()))))
- .setExecutor(
- ExecutorInfo
- .newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue(
- "executor_" + taskId.getValue()))
- .setName("Hadoop TaskTracker")
- .setSource(taskId.getValue())
- .addResources(
- Resource
- .newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (TASKTRACKER_CPUS))))
- .addResources(
- Resource
- .newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (TASKTRACKER_MEM)))).setCommand(commandInfo))
- .build();
+ TaskInfo info = TaskInfo
+ .newBuilder()
+ .setName(taskId.getValue())
+ .setTaskId(taskId)
+ .setSlaveId(offer.getSlaveId())
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (mapSlots + reduceSlots) * slotCpus)))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (mapSlots + reduceSlots) * slotMem)))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("disk")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (mapSlots + reduceSlots) * slotDisk)))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("ports")
+ .setType(Value.Type.RANGES)
+ .setRanges(
+ Value.Ranges
+ .newBuilder()
+ .addRange(Value.Range.newBuilder()
+ .setBegin(httpAddress.getPort())
+ .setEnd(httpAddress.getPort()))
+ .addRange(Value.Range.newBuilder()
+ .setBegin(reportAddress.getPort())
+ .setEnd(reportAddress.getPort()))))
+ .setExecutor(
+ ExecutorInfo
+ .newBuilder()
+ .setExecutorId(ExecutorID.newBuilder().setValue(
+ "executor_" + taskId.getValue()))
+ .setName("Hadoop TaskTracker")
+ .setSource(taskId.getValue())
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (TASKTRACKER_CPUS))))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (TASKTRACKER_MEM)))).setCommand(commandInfo))
+ .build();
- driver.launchTasks(offer.getId(), Arrays.asList(info));
+ driver.launchTasks(offer.getId(), Arrays.asList(info));
- neededMapSlots -= mapSlots;
- neededReduceSlots -= reduceSlots;
- }
+ neededMapSlots -= mapSlots;
+ neededReduceSlots -= reduceSlots;
+ }
- if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
- LOG.info("Satisfied map and reduce slots needed.");
- } else {
- LOG.info("Unable to fully satisfy needed map/reduce slots: "
- + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
- + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
- + "remaining");
+ if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+ LOG.info("Satisfied map and reduce slots needed.");
+ } else {
+ LOG.info("Unable to fully satisfy needed map/reduce slots: "
+ + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
+ + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
+ + "remaining");
+ }
}
}
@@ -568,21 +589,29 @@ public class MesosScheduler extends Task
// Remove the TaskTracker if the corresponding Mesos task has reached a
// terminal state.
- TaskState state = taskStatus.getState();
-
- if (state == TaskState.TASK_FINISHED || state == TaskState.TASK_FAILED
- || state == TaskState.TASK_KILLED || state == TaskState.TASK_LOST) {
-
- // Make a copy to iterate over keys and delete values.
- Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
-
- // Remove the task from the map.
- for (HttpHost tracker : trackers) {
- if (mesosTrackers.get(tracker).taskId == taskStatus.getTaskId()) {
- LOG.info("Removing terminated TaskTracker: " + tracker);
- mesosTrackers.remove(tracker);
+ switch (taskStatus.getState()) {
+ case TASK_FINISHED:
+ case TASK_FAILED:
+ case TASK_KILLED:
+ case TASK_LOST:
+ // Make a copy to iterate over keys and delete values.
+ Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
+
+ // Remove the task from the map.
+ for (HttpHost tracker : trackers) {
+ if (mesosTrackers.get(tracker).taskId == taskStatus.getTaskId()) {
+ LOG.info("Removing terminated TaskTracker: " + tracker);
+ mesosTrackers.remove(tracker);
+ }
}
- }
+ break;
+ case TASK_STAGING:
+ case TASK_STARTING:
+ case TASK_RUNNING:
+ break;
+ default:
+ LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
+ break;
}
}