You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:29:56 UTC
svn commit: r1077008 - in
/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src:
java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Author: omalley
Date: Fri Mar 4 03:29:56 2011
New Revision: 1077008
URL: http://svn.apache.org/viewvc?rev=1077008&view=rev
Log:
commit f5aa877d4e40502138ff8f0687c239f4cc695462
Author: Arun C Murthy <ac...@apache.org>
Date: Mon Sep 28 13:59:32 2009 -0700
MAPREDUCE-1030. Fix capacity-scheduler to assign a map and a reduce task per-heartbeat. Contributed by Rahuk K Singh.
from: http://issues.apache.org/jira/secure/attachment/12420549/MAPREDUCE-1030-2.patch.txt
+++ b/YAHOO-CHANGES.txt
+59. MAPREDUCE-1030. Fix capacity-scheduler to assign a map and a reduce task
+ per-heartbeat. Contributed by Rahuk K Singh.
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077008&r1=1077007&r2=1077008&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar 4 03:29:56 2011
@@ -57,7 +57,8 @@ import org.apache.hadoop.mapreduce.serve
*
*/
class CapacityTaskScheduler extends TaskScheduler {
-
+
+
/***********************************************************************
* Keeping track of scheduling information for queues
*
@@ -914,6 +915,7 @@ class CapacityTaskScheduler extends Task
private long memSizeForReduceSlotOnJT;
private long limitMaxMemForMapTasks;
private long limitMaxMemForReduceTasks;
+ private boolean assignMultipleTasks = true;
public CapacityTaskScheduler() {
this(new Clock());
@@ -1240,12 +1242,29 @@ class CapacityTaskScheduler extends Task
prevReduceClusterCapacity = reduceClusterCapacity;
}
+ /**
+ * Sets whether the scheduler can assign multiple tasks in a heartbeat
+ * or not.
+ *
+ * This method is used only for testing purposes.
+ *
+ * @param assignMultipleTasks true, to assign multiple tasks per heartbeat
+ */
+ void setAssignMultipleTasks(boolean assignMultipleTasks) {
+ this.assignMultipleTasks = assignMultipleTasks;
+ }
+
/*
- * The grand plan for assigning a task.
- * First, decide whether a Map or Reduce task should be given to a TT
+ * The grand plan for assigning a task.
+ *
+ * If multiple task assignment is enabled, it tries to get one map and
+ * one reduce slot depending on free slots on the TT.
+ *
+ * Otherwise, we decide whether a Map or Reduce task should be given to a TT
* (if the TT can accept either).
- * Next, pick a queue. We only look at queues that need a slot. Among these,
- * we first look at queues whose (# of running tasks)/capacity is the least.
+ * Either way, we first pick a queue. We only look at queues that need
+ * a slot. Among these, we first look at queues whose
+ * (# of running tasks)/capacity is the least.
* Next, pick a job in a queue. we pick the job at the front of the queue
* unless its user is over the user limit.
* Finally, given a job, pick a task from the job.
@@ -1253,17 +1272,8 @@ class CapacityTaskScheduler extends Task
*/
@Override
public synchronized List<Task> assignTasks(TaskTracker taskTracker)
- throws IOException {
-
- TaskLookupResult tlr;
+ throws IOException {
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
-
- /*
- * If TT has Map and Reduce slot free, we need to figure out whether to
- * give it a Map or Reduce task.
- * Number of ways to do this. For now, base decision on how much is needed
- * versus how much is used (default to Map, if equal).
- */
ClusterStatus c = taskTrackerManager.getClusterStatus();
int mapClusterCapacity = c.getMaxMapTasks();
int reduceClusterCapacity = c.getMaxReduceTasks();
@@ -1285,60 +1295,64 @@ class CapacityTaskScheduler extends Task
* becomes expensive, do it once every few heartbeats only.
*/
updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
- // make sure we get our map or reduce scheduling object to update its
- // collection of QSI objects too.
+ List<Task> result = new ArrayList<Task>();
+ if (assignMultipleTasks) {
+ addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+ addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
+ } else {
+ /*
+ * If TT has Map and Reduce slot free, we need to figure out whether to
+ * give it a Map or Reduce task.
+ * Number of ways to do this. For now, base decision on how much is needed
+ * versus how much is used (default to Map, if equal).
+ */
+ if ((maxReduceSlots - currentReduceSlots)
+ > (maxMapSlots - currentMapSlots)) {
+ addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+ if (result.size() == 0) {
+ addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
+ }
+ } else {
+ addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
+ if (result.size() == 0) {
+ addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+ }
+ }
+ if (result.size() == 0) {
+ return null;
+ }
+ }
+ return result;
+ }
- if ((maxReduceSlots - currentReduceSlots) >
- (maxMapSlots - currentMapSlots)) {
- // get a reduce task first
+ // Pick a reduce task and add to the list of tasks, if there's space
+ // on the TT to run one.
+ private void addReduceTask(TaskTracker taskTracker, List<Task> tasks,
+ int maxReduceSlots, int currentReduceSlots)
+ throws IOException {
+ if (maxReduceSlots > currentReduceSlots) {
reduceScheduler.updateCollectionOfQSIs();
- tlr = reduceScheduler.assignTasks(taskTracker);
- if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
- tlr.getLookUpStatus()) {
- // found a task; return
- return Collections.singletonList(tlr.getTask());
- }
- // if we didn't get any, look at map tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
- == tlr.getLookUpStatus() ||
- TaskLookupResult.LookUpStatus.NO_TASK_FOUND
- == tlr.getLookUpStatus())
- && (maxMapSlots > currentMapSlots)) {
- mapScheduler.updateCollectionOfQSIs();
- tlr = mapScheduler.assignTasks(taskTracker);
- if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
- tlr.getLookUpStatus()) {
- return Collections.singletonList(tlr.getTask());
- }
+ TaskLookupResult tlr = reduceScheduler.assignTasks(taskTracker);
+ if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+ tasks.add(tlr.getTask());
}
}
- else {
- // get a map task first
+ }
+
+ // Pick a map task and add to the list of tasks, if there's space
+ // on the TT to run one.
+ private void addMapTask(TaskTracker taskTracker, List<Task> tasks,
+ int maxMapSlots, int currentMapSlots)
+ throws IOException {
+ if (maxMapSlots > currentMapSlots) {
mapScheduler.updateCollectionOfQSIs();
- tlr = mapScheduler.assignTasks(taskTracker);
- if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
- tlr.getLookUpStatus()) {
- // found a task; return
- return Collections.singletonList(tlr.getTask());
- }
- // if we didn't get any, look at reduce tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
- == tlr.getLookUpStatus()
- || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
- == tlr.getLookUpStatus())
- && (maxReduceSlots > currentReduceSlots)) {
- reduceScheduler.updateCollectionOfQSIs();
- tlr = reduceScheduler.assignTasks(taskTracker);
- if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
- tlr.getLookUpStatus()) {
- return Collections.singletonList(tlr.getTask());
- }
+ TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker);
+ if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+ tasks.add(tlr.getTask());
}
}
-
- return null;
}
-
+
// called when a job is added
synchronized void jobAdded(JobInProgress job) throws IOException {
QueueSchedulingInfo qsi =
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077008&r1=1077007&r2=1077008&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 03:29:56 2011
@@ -738,6 +738,7 @@ public class TestCapacityScheduler exten
numReduceTasksPerTracker);
clock = new FakeClock();
scheduler = new CapacityTaskScheduler(clock);
+ scheduler.setAssignMultipleTasks(false);
scheduler.setTaskTrackerManager(taskTrackerManager);
conf = new JobConf();
@@ -1055,7 +1056,171 @@ public class TestCapacityScheduler exten
// complete task
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
}
-
+
+ /**
+ * Tests whether a map and reduce task are assigned when there's
+ * a single queue and multiple task assignment is enabled.
+ * @throws Exception
+ */
+ public void testMultiTaskAssignmentInSingleQueue() throws Exception {
+ try {
+ setUp(1, 6, 2);
+ // set up some queues
+ String[] qs = {"default"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+ scheduler.setAssignMultipleTasks(true);
+
+ //Submit the job with 6 maps and 2 reduces
+ FakeJobInProgress j1 = submitJobAndInit(
+ JobStatus.PREP, 6, 2, "default", "u1");
+
+ List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
+ assertEquals(tasks.size(), 2);
+ for (Task task : tasks) {
+ if (task.toString().contains("_m_")) {
+ LOG.info(" map task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
+ } else if (task.toString().contains("_r_")) {
+ LOG.info(" reduce task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
+ } else {
+ fail(" should not have come here " + task.toString());
+ }
+ }
+
+ for (Task task : tasks) {
+ if (task.toString().equals("attempt_test_0001_m_000001_0 on tt1")) {
+ //Now finish the task
+ taskTrackerManager.finishTask(
+ "tt1", task.getTaskID().toString(),
+ j1);
+ }
+ }
+
+ tasks = scheduler.assignTasks(tracker("tt1"));
+ assertEquals(tasks.size(), 2);
+ for (Task task : tasks) {
+ if (task.toString().contains("_m_")) {
+ LOG.info(" map task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
+ } else if (task.toString().contains("_r_")) {
+ LOG.info(" reduce task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0001_r_000002_0 on tt1");
+ } else {
+ fail(" should not have come here " + task.toString());
+ }
+ }
+
+ //now both the reduce slots are being used , hence we should not
+ // get only 1 map task in this assignTasks call.
+ tasks = scheduler.assignTasks(tracker("tt1"));
+ assertEquals(tasks.size(), 1);
+ for (Task task : tasks) {
+ if (task.toString().contains("_m_")) {
+ LOG.info(" map task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0001_m_000003_0 on tt1");
+ } else if (task.toString().contains("_r_")) {
+ LOG.info(" reduce task assigned " + task.toString());
+ fail("should not give reduce task " + task.toString());
+ } else {
+ fail(" should not have come here " + task.toString());
+ }
+ }
+ } finally {
+ scheduler.setAssignMultipleTasks(false);
+ }
+ }
+
+ public void testMultiTaskAssignmentInMultipleQueues() throws Exception {
+ try {
+ setUp(1, 6, 2);
+ // set up some queues
+ String[] qs = {"default","q1"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+ scheduler.setAssignMultipleTasks(true);
+
+ //Submit the job with 6 maps and 2 reduces
+ submitJobAndInit(
+ JobStatus.PREP, 6, 1, "default", "u1");
+
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP,2,1,"q1","u2");
+
+ List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
+ assertEquals(tasks.size(), 2);
+ for (Task task : tasks) {
+ if (task.toString().contains("_m_")) {
+ LOG.info(" map task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
+ } else if (task.toString().contains("_r_")) {
+ LOG.info(" reduce task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
+ } else {
+ fail(" should not have come here " + task.toString());
+ }
+ }
+
+ // next assignment will be for job in second queue.
+ tasks = scheduler.assignTasks(tracker("tt1"));
+ assertEquals(tasks.size(), 2);
+ for (Task task : tasks) {
+ if (task.toString().contains("_m_")) {
+ LOG.info(" map task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0002_m_000001_0 on tt1");
+ } else if (task.toString().contains("_r_")) {
+ LOG.info(" reduce task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0002_r_000001_0 on tt1");
+ } else {
+ fail(" should not have come here " + task.toString());
+ }
+ }
+
+ //now both the reduce slots are being used , hence we sholdnot get only 1
+ //map task in this assignTasks call.
+ tasks = scheduler.assignTasks(tracker("tt1"));
+ assertEquals(tasks.size(), 1);
+ for (Task task : tasks) {
+ if (task.toString().contains("_m_")) {
+ LOG.info(" map task assigned " + task.toString());
+ // we get from job 2 because the queues are equal in capacity usage
+ // and sorting leaves order unchanged.
+ assertEquals(task.toString(), "attempt_test_0002_m_000002_0 on tt1");
+ } else if (task.toString().contains("_r_")) {
+ LOG.info(" reduce task assigned " + task.toString());
+ fail("should not give reduce task " + task.toString());
+ } else {
+ fail(" should not have come here " + task.toString());
+ }
+ }
+
+ tasks = scheduler.assignTasks(tracker("tt1"));
+ assertEquals(tasks.size(), 1);
+ for (Task task : tasks) {
+ if (task.toString().contains("_m_")) {
+ LOG.info(" map task assigned " + task.toString());
+ assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
+ } else if (task.toString().contains("_r_")) {
+ LOG.info(" reduce task assigned " + task.toString());
+ fail("should not give reduce task " + task.toString());
+ } else {
+ fail(" should not have come here " + task.toString());
+ }
+ }
+ } finally {
+ scheduler.setAssignMultipleTasks(false);
+ }
+ }
+
// basic tests, should be able to submit to queues
public void testSubmitToQueues() throws Exception {
// set up some queues
@@ -1947,14 +2112,24 @@ public class TestCapacityScheduler exten
}
/**
- * Test blocking of cluster for lack of memory.
+ * Tests that scheduler schedules normal jobs once high RAM jobs
+ * have been reserved to the limit.
+ *
+ * The test causes the scheduler to schedule a normal job on two
+ * trackers, and one task of the high RAM job on a third. Then it
+ * asserts that one of the first two trackers gets a reservation
+ * for the remaining task of the high RAM job. After this, it
+ * asserts that a normal job submitted later is allowed to run
+ * on a free slot, as all tasks of the high RAM job are either
+ * scheduled or reserved.
+ *
* @throws IOException
*/
public void testClusterBlockingForLackOfMemory()
throws IOException {
LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
+ taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1977,34 +2152,38 @@ public class TestCapacityScheduler exten
scheduler.start();
LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
- + "1 map, 1 reduce tasks.");
+ + "2 map, 2 reduce tasks.");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- // Fill the second tt with this job.
- checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+ // Fill a tt with this job's tasks.
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// Total 1 map slot should be accounted for.
- checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f);
assertEquals(String.format(
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
1, 1, 0, 0, 0, 0),
(String) job1.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
- checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
- // Total 1 map slot should be accounted for.
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
- 25.0f);
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 0L);
+
+ // same for reduces.
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f);
assertEquals(String.format(
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
1, 1, 0, 1, 1, 0),
(String) job1.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+
+ // fill another TT with the rest of the tasks of the job
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+ "2 map, 2 reduce tasks.");
@@ -2017,27 +2196,26 @@ public class TestCapacityScheduler exten
jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- // Total 3 map slots should be accounted for.
- checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
+ // Have another TT run one task of each type of the high RAM
+ // job. This will fill up the TT.
+ checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
+ checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
assertEquals(String.format(
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
1, 2, 0, 0, 0, 0),
(String) job2.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+ checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 0L);
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
- // Total 3 reduce slots should be accounted for.
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 3,
- 75.0f);
+ checkAssignment("tt3", "attempt_test_0002_r_000001_0 on tt3");
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f);
assertEquals(String.format(
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
1, 2, 0, 1, 2, 0),
(String) job2.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+ checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L);
LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
- + "1 map, 0 reduce tasks.");
+ + "1 map, 1 reduce tasks.");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
@@ -2047,29 +2225,27 @@ public class TestCapacityScheduler exten
jConf.setUser("u1");
FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
- // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2
- assertNull(scheduler.assignTasks(tracker("tt1")));
+ // Send a TT with insufficient space for task assignment,
+ // This will cause a reservation for the high RAM job.
assertNull(scheduler.assignTasks(tracker("tt1")));
- // reserved tasktrackers contribute to occupied slots for maps.
- checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f);
- // occupied slots for reduces remain unchanged as tt1 is not reserved for
- // reduces.
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+ // reserved tasktrackers contribute to occupied slots for maps and reduces
+ checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
LOG.info(job2.getSchedulingInfo());
assertEquals(String.format(
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 2, 2, 1, 2, 0),
+ 1, 2, 2, 1, 2, 2),
(String) job2.getSchedulingInfo());
assertEquals(String.format(
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
0, 0, 0, 0, 0, 0),
(String) job3.getSchedulingInfo());
-
- // One reservation is already done for job2. So job3 should go ahead.
+
+ // Reservations are already done for job2. So job3 should go ahead.
checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
}
/**