You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/10/13 15:28:30 UTC
svn commit: r824750 [1/2] - in /hadoop/mapreduce/trunk: ./
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Author: yhemanth
Date: Tue Oct 13 13:28:29 2009
New Revision: 824750
URL: http://svn.apache.org/viewvc?rev=824750&view=rev
Log:
MAPREDUCE-1030. Modified scheduling algorithm to return a map and reduce task per heartbeat in the capacity scheduler. Contributed by Rahul Kumar Singh.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=824750&r1=824749&r2=824750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Oct 13 13:28:29 2009
@@ -748,4 +748,8 @@
MAPREDUCE-979. Fixed JobConf APIs related to memory parameters to return
values of new configuration variables when deprecated variables are
disabled. (Sreekanth Ramakrishnan via yhemanth)
-
+
+ MAPREDUCE-1030. Modified scheduling algorithm to return a map and reduce
+ task per heartbeat in the capacity scheduler.
+ (Rahul Kumar Singh via yhemanth)
+
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=824750&r1=824749&r2=824750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue Oct 13 13:28:29 2009
@@ -906,8 +906,10 @@
/*
* The grand plan for assigning a task.
- * First, decide whether a Map or Reduce task should be given to a TT
- * (if the TT can accept either).
+ * Always assigns 1 reduce and 1 map , if sufficient slots are
+ * available for each of types.
+ * If not , then which ever type of slots are available , that type of task is
+ * assigned.
* Next, pick a queue. We only look at queues that need a slot. Among these,
* we first look at queues whose (# of running tasks)/capacity is the least.
* Next, pick a job in a queue. we pick the job at the front of the queue
@@ -921,12 +923,12 @@
TaskLookupResult tlr;
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
+ List<Task> result = new ArrayList<Task>();
/*
- * If TT has Map and Reduce slot free, we need to figure out whether to
- * give it a Map or Reduce task.
- * Number of ways to do this. For now, base decision on how much is needed
- * versus how much is used (default to Map, if equal).
+ * If TT has Map and Reduce slot free, we assign 1 map and 1 reduce
+ * We base decision on how much is needed
+ * versus how much is used
*/
ClusterStatus c = taskTrackerManager.getClusterStatus();
int mapClusterCapacity = c.getMaxMapTasks();
@@ -953,51 +955,26 @@
// make sure we get our map or reduce scheduling object to update its
// collection of QSC objects too.
- if ((maxReduceSlots - currentReduceSlots) >
- (maxMapSlots - currentMapSlots)) {
- // get a reduce task first
+ if (maxReduceSlots > currentReduceSlots) {
+ //reduce slot available , try to get a
+ //reduce task
tlr = reduceScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
- // found a task; return
- return Collections.singletonList(tlr.getTask());
- }
- // if we didn't get any, look at map tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
- == tlr.getLookUpStatus() ||
- TaskLookupResult.LookUpStatus.NO_TASK_FOUND
- == tlr.getLookUpStatus())
- && (maxMapSlots > currentMapSlots)) {
- tlr = mapScheduler.assignTasks(taskTracker);
- if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
- tlr.getLookUpStatus()) {
- return Collections.singletonList(tlr.getTask());
- }
+ result.add(tlr.getTask());
}
}
- else {
- // get a map task first
+
+ if(maxMapSlots > currentMapSlots) {
+ //map slot available , try to get a map task
tlr = mapScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
- // found a task; return
- return Collections.singletonList(tlr.getTask());
- }
- // if we didn't get any, look at reduce tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
- == tlr.getLookUpStatus()
- || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
- == tlr.getLookUpStatus())
- && (maxReduceSlots > currentReduceSlots)) {
- tlr = reduceScheduler.assignTasks(taskTracker);
- if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
- tlr.getLookUpStatus()) {
- return Collections.singletonList(tlr.getTask());
- }
+ result.add(tlr.getTask());
}
}
-
- return null;
+
+ return (result.isEmpty()) ? null : result;
}
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=824750&r1=824749&r2=824750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Tue Oct 13 13:28:29 2009
@@ -20,13 +20,13 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -48,6 +48,8 @@
public class CapacityTestUtils {
static final Log LOG =
LogFactory.getLog(org.apache.hadoop.mapred.CapacityTestUtils.class);
+ static final String MAP = "map";
+ static final String REDUCE = "reduce";
/**
@@ -160,18 +162,116 @@
}
}
-
+ /**
+ * The method accepts a attempt string and checks for validity of
+ * assignTask w.r.t attempt string.
+ *
+ * @param taskTrackerManager
+ * @param scheduler
+ * @param taskTrackerName
+ * @param expectedTaskString
+ * @return
+ * @throws IOException
+ */
static Task checkAssignment(
CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager,
CapacityTaskScheduler scheduler, String taskTrackerName,
String expectedTaskString) throws IOException {
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ if (expectedTaskString.contains("_m_")) {
+ expectedStrings.put(MAP, expectedTaskString);
+ } else if (expectedTaskString.contains("_r_")) {
+ expectedStrings.put(REDUCE, expectedTaskString);
+ }
+ List<Task> tasks = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, taskTrackerName, expectedStrings);
+ for (Task task : tasks) {
+ if (task.toString().equals(expectedTaskString)) {
+ return task;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Checks the validity of tasks assigned by scheduler's assignTasks method
+ * According to JIRA:1030 every assignTasks call in CapacityScheduler
+ * would result in either MAP or REDUCE or BOTH.
+ *
+ * This method accepts a Map<String,String>.
+ * The map should always have <=2 entried in hashMap.
+ *
+ * sample calling code .
+ *
+ * Map<String, String> expectedStrings = new HashMap<String, String>();
+ * ......
+ * .......
+ * expectedStrings.clear();
+ * expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ * expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+ * checkMultipleTaskAssignment(
+ * taskTrackerManager, scheduler, "tt1",
+ * expectedStrings);
+ *
+ * @param taskTrackerManager
+ * @param scheduler
+ * @param taskTrackerName
+ * @param expectedTaskStrings
+ * @return
+ * @throws IOException
+ */
+ static List<Task> checkMultipleTaskAssignment(
+ CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager,
+ CapacityTaskScheduler scheduler, String taskTrackerName,
+ Map<String,String> expectedTaskStrings) throws IOException {
+ //Call assign task
List<Task> tasks = scheduler.assignTasks(
taskTrackerManager.getTaskTracker(
taskTrackerName));
- assertNotNull(expectedTaskString, tasks);
- assertEquals(expectedTaskString, 1, tasks.size());
- assertEquals(expectedTaskString, tasks.get(0).toString());
- return tasks.get(0);
+
+ if (tasks==null) {
+ if (expectedTaskStrings.size() > 0) {
+ fail("Expected some tasks to be assigned, but got none.");
+ } else {
+ return null;
+ }
+ }
+
+ if (expectedTaskStrings.size() > tasks.size()) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Expected strings different from actual strings.");
+ sb.append(" Expected string count=").append(expectedTaskStrings.size());
+ sb.append(" Actual string count=").append(tasks.size());
+ sb.append(" Expected strings=");
+ for (String expectedTask : expectedTaskStrings.values()) {
+ sb.append(expectedTask).append(",");
+ }
+ sb.append("Actual strings=");
+ for (Task actualTask : tasks) {
+ sb.append(actualTask.toString()).append(",");
+ }
+ fail(sb.toString());
+ }
+
+ for (Task task : tasks) {
+ LOG.info("tasks are : " + tasks.toString());
+ if (task.isMapTask()) {
+ //check if expected string is set for map or not.
+ if (expectedTaskStrings.get(MAP) != null) {
+ assertEquals(expectedTaskStrings.get(MAP), task.toString());
+ } else {
+ fail("No map task is expected, but got " + task.toString());
+ }
+ } else {
+ //check if expectedStrings is set for reduce or not.
+ if (expectedTaskStrings.get(REDUCE) != null) {
+ assertEquals(expectedTaskStrings.get(REDUCE), task.toString());
+ } else {
+ fail("No reduce task is expected, but got " + task.toString());
+ }
+ }
+ }
+ return tasks;
}
static void verifyCapacity(
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=824750&r1=824749&r2=824750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Oct 13 13:28:29 2009
@@ -108,21 +108,21 @@
taskTrackerManager.initJob(fjob1);
+ //1 map and 1 reduce assigned
List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
+ //2 map are assigned reached the maxlimit
List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
- //Once the 2 tasks are running the third assigment should be reduce.
- checkAssignment(
- taskTrackerManager, scheduler, "tt3",
- "attempt_test_0001_r_000001_0 on tt3");
- //This should fail.
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
+ //task3 is null as maxlimit is reached.
+ List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+ assertNull(task3);
//Now complete the task 1.
// complete the job
+ for(Task task: task1) {
taskTrackerManager.finishTask(
- task1.get(0).getTaskID().toString(),
+ task.getTaskID().toString(),
fjob1);
+ }
//We have completed the tt1 task which was a map task so we expect one map
//task to be picked up
checkAssignment(
@@ -154,23 +154,29 @@
taskTrackerManager.initJob(fjob1);
+ //1 map and 1 reduce
List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
+
+ // 1 reduce assigned
List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
+
+ // No tasks should be assigned, as we have reached the max cap.
List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+ assertNull(task3);
- //This should fail. 1 map, 2 reduces , we have reached the limit.
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
//Now complete the task 1 i.e map task.
- // complete the job
- taskTrackerManager.finishTask(
- task1.get(0).getTaskID().toString(),
- fjob1);
-
- //This should still fail as only map task is done
- task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
+ for(Task task: task1) {
+ if (task.isMapTask()) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(),
+ fjob1);
+ }
+ }
+ //Still no slots available for reduce hence no tasks
+ //assigned
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+
//Complete the reduce task
taskTrackerManager.finishTask(
task2.get(0).getTaskID().toString(), fjob1);
@@ -403,19 +409,27 @@
// submit a job with no queue specified. It should be accepted
// and given to the default queue.
- JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP,
+ 10, 10, null, "u1");
+ // when we ask for tasks, we should get them for the job submitted
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ expectedTaskStrings.put(CapacityTestUtils.MAP,
+ "attempt_test_0001_m_000001_0 on tt1");
+ expectedTaskStrings.put(CapacityTestUtils.REDUCE,
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
- // when we ask for a task, we should get one, from the job submitted
- Task t;
- t = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
// submit another job, to a different queue
j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // now when we get a task, it should be from the second job
- t = checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
+ // now when we get tasks, it should be from the second job
+ expectedTaskStrings.clear();
+ expectedTaskStrings.put(CapacityTestUtils.MAP,
+ "attempt_test_0002_m_000001_0 on tt2");
+ expectedTaskStrings.put(CapacityTestUtils.REDUCE,
+ "attempt_test_0002_r_000001_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
}
public void testGetJobs() throws Exception {
@@ -544,19 +558,30 @@
// submit a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment(
+ // we should get a task
+ Map<String,String> expectedStrings = new HashMap<String,String>();
+ expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- // I should get another map task.
+ expectedStrings);
+
+ // I should get another map task.
+ //No redduces as there is 1 slot only for reduce on TT
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
+
// Now we're at full capacity for maps. If I ask for another map task,
- // I should get a map task from the default queue's capacity.
- checkAssignment(
+ // I should get a map task from the default queue's capacity.
+ //same with reduce
+ expectedStrings.put(MAP,"attempt_test_0001_m_000003_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000003_0 on tt2");
+ expectedStrings);
+
// and another
checkAssignment(
taskTrackerManager, scheduler, "tt2",
@@ -608,7 +633,8 @@
jConf.setNumReduceTasks(0);
jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
LOG.debug(
"Submit another regular memory(1GB vmem maps/reduces) job of "
@@ -620,22 +646,18 @@
jConf.setNumReduceTasks(2);
jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
// first, a map from j1 will run this is a high memory job so it would
- // occupy the 2 slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
-
- checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
- // at this point, the scheduler tries to schedule another map from j1.
- // there isn't enough space. The second job's reduce should be scheduled.
- checkAssignment(
+ // occupy the 2 slots and it would try to assign the reduce task from
+ //job2.
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
+ expectedStrings);
checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
@@ -644,10 +666,11 @@
//another task tracker.
// This should not happen as all the map slots are taken
//by the first task itself.hence reduce task from the second job is given
-
- checkAssignment(
+ expectedStrings.clear();
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_r_000002_0 on tt2");
+ expectedStrings);
}
/**
@@ -664,60 +687,71 @@
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(2);
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(2);
+ scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+ .getMapTSC().setMaxTaskLimit(2);
+ scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+ .getReduceTSC().setMaxTaskLimit(2);
// submit a job
FakeJobInProgress fjob1 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 10, 10, "default", "u1");
FakeJobInProgress fjob2 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 10, 10, "default", "u2");
// for queue 'default', the capacity for maps is 2.
// But the max map limit is 2
// hence user should be getting not more than 1 as it is the 50%.
- Task t1 = checkAssignment(
+ //same with reduce
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ List<Task> t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings);
//Now we should get the task from the other job. As the
//first user has reached his max map limit.
+ //same with reduce
+ populateExpectedStrings(expectedStrings,
+ "attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+
+ //Now we are done with map and reduce limit ,
+ // now if we ask for task we should
+ // get null.
+ List<Task> t3 = scheduler.assignTasks(tracker("tt3"));
+ assertNull(t3);
+
+ //We completed 1 map and 1 reduce in here
+ for (Task task : t1) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(),
+ fjob1);
+ }
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
-
- //Now we are done with map limit , now if we ask for task we should
- // get reduce from 1st job
- checkAssignment(
- taskTrackerManager, scheduler, "tt3",
- "attempt_test_0001_r_000001_0 on tt3");
- // Now we're at full capacity for maps. 1 done with reduces for job 1 so
- // now we should get 1 reduces for job 2
- Task t4 = checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0002_r_000001_0 on tt4");
-
- taskTrackerManager.finishTask(
- t1.getTaskID().toString(),
- fjob1);
-
- //tt1 completed the task so we have 1 map slot for u1
- // we are assigning the 2nd map task from fjob1
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000002_0 on tt1");
-
- taskTrackerManager.finishTask(
- t4.getTaskID().toString(),
- fjob2);
- //tt4 completed the task , so we have 1 reduce slot for u2
- //we are assigning the 2nd reduce from fjob2
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0002_r_000002_0 on tt4");
-
+ //again we would assign 1 map and 1 reduce
+ populateExpectedStrings(expectedStrings,
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000002_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ }
+
+ // Utility method to construct a map of expected strings
+ // with exactly one map task and one reduce task.
+ private void populateExpectedStrings(Map<String, String> expectedTaskStrings,
+ String mapTask, String reduceTask) {
+ expectedTaskStrings.clear();
+ expectedTaskStrings.put(CapacityTestUtils.MAP, mapTask);
+ expectedTaskStrings.put(CapacityTestUtils.REDUCE, reduceTask);
}
@@ -736,26 +770,31 @@
// submit a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ // for queue 'q2', the capacity is 2 for maps and 1 for reduce.
+ // Since we're the only user, we should get tasks
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
+
// Submit another job, from a different user
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // Now if I ask for a map task, it should come from the second job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- // Now we're at full capacity for maps. If I ask for another map task,
- // I should get a map task from the default queue's capacity.
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000002_0 on tt2");
+ // Now if I ask for a task, it should come from the second job
+ checkAssignment(taskTrackerManager, scheduler,
+ "tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+ // Now we're at full capacity. If I ask for another task,
+ // I should get tasks from the default queue's capacity.
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000002_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
// and another
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000002_0 on tt2");
+ checkAssignment(taskTrackerManager, scheduler,
+ "tt2", "attempt_test_0002_m_000002_0 on tt2");
}
// test user limits when a 2nd job is submitted much after first job
@@ -773,21 +812,28 @@
// submit a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ // for queue 'q2', the capacity for maps is 2 and reduce is 1.
+ // Since we're the only user, we should get tasks
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
+
// since we're the only job, we get another map
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
+
// Submit another job, from a different user
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // Now if I ask for a map task, it should come from the second job
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
+ // Now if I ask for a task, it should come from the second job
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
// and another
checkAssignment(
taskTrackerManager, scheduler, "tt2",
@@ -810,41 +856,56 @@
// submit a job
FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ // for queue 'q2', the capacity for maps is 2 and reduces is 1.
+ // Since we're the only user, we should get a task
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
// since we're the only job, we get another map
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
- // we get two more maps from 'default queue'
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000003_0 on tt2");
+ // we get more tasks from 'default queue'
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000003_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
checkAssignment(
taskTrackerManager, scheduler, "tt2",
"attempt_test_0001_m_000004_0 on tt2");
+
// Submit another job, from a different user
FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // one of the task finishes
+ // one of the task finishes of each type
taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
- // Now if I ask for a map task, it should come from the second job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
+ taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", j1);
+
+ // Now if I ask for a task, it should come from the second job
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
+
// another task from job1 finishes, another new task to job2
taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_m_000002_0 on tt1");
+
// now we have equal number of tasks from each job. Whichever job's
// task finishes, that job gets a new task
taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000005_0 on tt2");
+ taskTrackerManager.finishTask("attempt_test_0001_r_000002_0", j1);
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000005_0 on tt2",
+ "attempt_test_0001_r_000003_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
checkAssignment(
taskTrackerManager, scheduler, "tt1",
@@ -853,7 +914,7 @@
// test user limits with many users, more slots
public void testUserLimits4() throws Exception {
- // set up one queue, with 10 slots
+ // set up one queue, with 10 map slots and 5 reduce slots
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -870,42 +931,31 @@
// u1 submits job
FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// it gets the first 5 slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000004_0 on tt2");
- checkAssignment(
- taskTrackerManager, scheduler, "tt3",
- "attempt_test_0001_m_000005_0 on tt3");
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ for (int i=0; i<5; i++) {
+ String ttName = "tt"+(i+1);
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_00000"+(i+1)+"_0 on " + ttName,
+ "attempt_test_0001_r_00000"+(i+1)+"_0 on " + ttName);
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ ttName, expectedTaskStrings);
+ }
+
// u2 submits job with 4 slots
FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
// u2 should get next 4 slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt3",
- "attempt_test_0002_m_000001_0 on tt3");
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0002_m_000002_0 on tt4");
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0002_m_000003_0 on tt4");
- checkAssignment(
- taskTrackerManager, scheduler, "tt5",
- "attempt_test_0002_m_000004_0 on tt5");
+ for (int i=0; i<4; i++) {
+ String ttName = "tt"+(i+1);
+ checkAssignment(taskTrackerManager, scheduler, ttName,
+ "attempt_test_0002_m_00000"+(i+1)+"_0 on " + ttName);
+ }
// last slot should go to u1, since u2 has no more tasks
checkAssignment(
taskTrackerManager, scheduler, "tt5",
"attempt_test_0001_m_000006_0 on tt5");
- // u1 finishes a task
+ // u1 finishes tasks
taskTrackerManager.finishTask("attempt_test_0001_m_000006_0", j1);
+ taskTrackerManager.finishTask("attempt_test_0001_r_000005_0", j1);
// u1 submits a few more jobs
// All the jobs are inited when submitted
// because of addition of Eager Job Initializer all jobs in this
@@ -917,23 +967,26 @@
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
// now u3 submits a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
- // next slot should go to u3, even though u2 has an earlier job, since
+ // next map slot should go to u3, even though u2 has an earlier job, since
// user limits have changed and u1/u2 are over limits
- checkAssignment(
- taskTrackerManager, scheduler, "tt5",
- "attempt_test_0007_m_000001_0 on tt5");
+ // reduce slot will go to job 2, as it is still under limit.
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0007_m_000001_0 on tt5",
+ "attempt_test_0002_r_000001_0 on tt5");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt5", expectedTaskStrings);
// some other task finishes and u3 gets it
taskTrackerManager.finishTask("attempt_test_0002_m_000004_0", j1);
checkAssignment(
- taskTrackerManager, scheduler, "tt5",
- "attempt_test_0007_m_000002_0 on tt5");
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0007_m_000002_0 on tt4");
// now, u2 finishes a task
taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j1);
// next slot will go to u1, since u3 has nothing to run and u1's job is
// first in the queue
checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0001_m_000007_0 on tt4");
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000007_0 on tt2");
}
/**
@@ -955,13 +1008,13 @@
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(
- JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
+ JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(
- MRConfig.MAPMEMORY_MB, 1 * 1024);
+ MRConfig.MAPMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(
- JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
+ JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(
- MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+ MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
@@ -973,7 +1026,8 @@
jConf.setNumReduceTasks(6);
jConf.setUser("u1");
jConf.setQueueName("default");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
LOG.debug(
"Submit one high memory(2GB maps, 2GB reduces) job of "
@@ -985,60 +1039,41 @@
jConf.setNumReduceTasks(6);
jConf.setQueueName("default");
jConf.setUser("u2");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // Verify that normal job takes 3 task assignments to hit user limits
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000002_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000003_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000003_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000004_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000004_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000005_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000005_0 on tt1");
+ // Verify that normal job takes 5 task assignments to hit user limits
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ for (int i = 0; i < 5; i++) {
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
+ "attempt_test_0001_m_00000" + (i + 1) + "_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0001_r_00000" + (i + 1) + "_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ }
// u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
// are hit. So u2 should get slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000002_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000002_0 on tt1");
-
- // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+ for (int i = 0; i < 2; i++) {
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
+ "attempt_test_0002_m_00000" + (i + 1) + "_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0002_r_00000" + (i + 1) + "_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ } // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
// slots. Because of high memory tasks, giving u2 another task would
// overflow limits. So, no more tasks should be given to anyone.
assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
}
/*
@@ -1051,9 +1086,7 @@
* - Then run initializationPoller()
* - Check once again the waiting queue, it should be 5 jobs again.
* - Then raise status change events.
- * - Assign one task to a task tracker. (Map)
- * - Check waiting job count, it should be 4 now and used map (%) = 100
- * - Assign another one task (Reduce)
+ * - Assign tasks to a task tracker.
* - Check waiting job count, it should be 4 now and used map (%) = 100
* and used reduce (%) = 100
* - finish the job and then check the used percentage it should go
@@ -1066,9 +1099,9 @@
* - Check the count, the waiting job count should be 2.
* - Now raise status change events to move the initialized jobs which
* should be two in count to running queue.
- * - Then schedule a map of the job in running queue.
+ * - Then schedule a map and reduce of the job in running queue.
* - Run the poller because the poller is responsible for waiting
- * jobs count. Check the count, it should be using 100% map and one
+ * jobs count. Check the count, it should be using 100% map, reduce and one
* waiting job
* - fail the running job.
* - Check the count, it should be now one waiting job and zero running
@@ -1161,9 +1194,12 @@
raiseStatusChangeEvents(scheduler.jobQueuesManager);
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
//assign one job
- Task t1 = checkAssignment(
+ Map<String, String> strs = new HashMap<String, String>();
+ strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ List<Task> t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ strs);
//Initalize extra job.
controlledInitializationPoller.selectJobsToInitialize();
@@ -1174,24 +1210,22 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 20);
+
+ assertEquals(infoStrings.length, 22);
assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 1");
assertEquals(infoStrings[9], "Active users:");
assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
- assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
- assertEquals(infoStrings[15], "Running tasks: 0");
- assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
+ assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[15], "Running tasks: 1");
+ assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
- //assign a reduce task
- Task t2 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
// make sure we update our stats
scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
+
assertEquals(infoStrings.length, 22);
assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 1");
@@ -1205,8 +1239,9 @@
//Complete the job and check the running tasks count
FakeJobInProgress u1j1 = userJobs.get(0);
- taskTrackerManager.finishTask(t1.getTaskID().toString(), u1j1);
- taskTrackerManager.finishTask(t2.getTaskID().toString(), u1j1);
+ for (Task task : t1) {
+ taskTrackerManager.finishTask(task.getTaskID().toString(), u1j1);
+ }
taskTrackerManager.finalizeJob(u1j1);
// make sure we update our stats
@@ -1214,6 +1249,7 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
+
assertEquals(infoStrings.length, 18);
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
@@ -1274,9 +1310,12 @@
//Now schedule a map should be job3 of the user as job1 succeeded job2
//failed and now job3 is running
- t1 = checkAssignment(
+ strs.clear();
+ strs.put(CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1");
+ strs.put(CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+ t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_m_000001_0 on tt1");
+ strs);
FakeJobInProgress u1j3 = userJobs.get(2);
assertTrue(
"User Job 3 not running ",
@@ -1290,12 +1329,16 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 20);
+ assertEquals(infoStrings.length, 22);
assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 1");
assertEquals(infoStrings[9], "Active users:");
assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
- assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
+ assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[15], "Running tasks: 1");
+ assertEquals(infoStrings[16], "Active users:");
+ assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[20], "Number of Waiting Jobs: 1");
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -1347,13 +1390,14 @@
// assert that all tasks are launched even though they transgress the
// scheduling limits.
-
- checkAssignment(
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
+ expectedStrings);
}
/**
@@ -1366,7 +1410,7 @@
throws IOException {
// 2 map and 1 reduce slots
- taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
+ taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1399,7 +1443,8 @@
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
LOG.debug(
"Submit another regular memory(1GB vmem maps/reduces) job of "
@@ -1411,170 +1456,188 @@
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // first, a map from j1 will run
- checkAssignment(
+ // first, a map from j1 and a reduce from other job j2
+ Map<String,String> strs = new HashMap<String,String>();
+ strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ strs.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ strs);
// Total 2 map slots should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
- // at this point, the scheduler tries to schedule another map from j1.
- // there isn't enough space. The second job's reduce should be scheduled.
+ //TT has 2 slots for reduces hence this call should get a reduce task
+ //from other job
checkAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
- // Total 1 reduce slot should be accounted for.
- checkOccupiedSlots(
- "default", TaskType.REDUCE, 1, 1,
- 100.0f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
+ "attempt_test_0002_r_000002_0 on tt1");
+ checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+
+ //now as all the slots are occupied hence no more tasks would be
+ //assigned.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
}
- /**
- * Test blocking of cluster for lack of memory.
+ /**
+ * Tests that scheduler schedules normal jobs once high RAM jobs
+ * have been reserved to the limit.
+ *
+ * The test causes the scheduler to schedule a normal job on two
+ * trackers, and one task of the high RAM job on a third. Then it
+ * asserts that one of the first two trackers gets a reservation
+ * for the remaining task of the high RAM job. After this, it
+ * asserts that a normal job submitted later is allowed to run
+ * on a free slot, as all tasks of the high RAM job are either
+ * scheduled or reserved.
*
* @throws IOException
*/
public void testClusterBlockingForLackOfMemory()
- throws IOException {
-
- LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
-
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- taskTrackerManager.addQueues(new String[]{"default"});
-
-
- scheduler.setTaskTrackerManager(taskTrackerManager);
- // enabled memory-based scheduling
- // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
- scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
- scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
- scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
- scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
- taskTrackerManager.setFakeQueues(queues);
- scheduler.start();
-
- LOG.debug(
- "Submit one normal memory(1GB maps/reduces) job of "
- + "1 map, 1 reduce tasks.");
- JobConf jConf = new JobConf(conf);
- jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ throws IOException {
- // Fill the second tt with this job.
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000001_0 on tt2");
- // Total 1 map slot should be accounted for.
- checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 1, 0, 0, 0, 0),
- (String) job1.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_r_000001_0 on tt2");
- // Total 1 map slot should be accounted for.
- checkOccupiedSlots(
- "default", TaskType.REDUCE, 1, 1,
- 25.0f);
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 1, 0, 1, 1, 0),
- (String) job1.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
-
- LOG.debug(
- "Submit one high memory(2GB maps/reduces) job of "
- + "2 map, 2 reduce tasks.");
- jConf = new JobConf(conf);
- jConf.setMemoryForMapTask(2 * 1024);
- jConf.setMemoryForReduceTask(2 * 1024);
- jConf.setNumMapTasks(2);
- jConf.setNumReduceTasks(2);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
-
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- // Total 3 map slots should be accounted for.
- checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 2, 0, 0, 0, 0),
- (String) job2.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+ LOG.debug("Starting the scheduler.");
+ taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2);
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
- // Total 3 reduce slots should be accounted for.
- checkOccupiedSlots(
- "default", TaskType.REDUCE, 1, 3,
- 75.0f);
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 2, 0, 1, 2, 0),
- (String) job2.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
-
- LOG.debug(
- "Submit one normal memory(1GB maps/reduces) job of "
- + "1 map, 0 reduce tasks.");
- jConf = new JobConf(conf);
- jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
-
- // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2
- assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- // reserved tasktrackers contribute to occupied slots for maps.
- checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f);
- // occupied slots for reduces remain unchanged as tt1 is not reserved for
- // reduces.
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
- LOG.info(job2.getSchedulingInfo());
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 2, 2, 1, 2, 0),
- (String) job2.getSchedulingInfo());
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 0, 0, 0, 0, 0, 0),
- (String) job3.getSchedulingInfo());
-
- // One reservation is already done for job2. So job3 should go ahead.
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0003_m_000001_0 on tt2");
- }
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+ taskTrackerManager.addQueues(new String[]{"default"});
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ // enabled memory-based scheduling
+ // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
+ scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
+ scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
+ scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
+ scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+ taskTrackerManager.setFakeQueues(queues);
+ scheduler.start();
+
+ LOG.debug(
+ "Submit one normal memory(1GB maps/reduces) job of "
+ + "2 map, 2 reduce tasks.");
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
+
+ // Fill a tt with this job's tasks.
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ // Total 1 map slot should be accounted for.
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f);
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 1, 0, 1, 1, 0),
+ (String) job1.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+
+ // fill another TT with the rest of the tasks of the job
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+
+ LOG.debug(
+ "Submit one high memory(2GB maps/reduces) job of "
+ + "2 map, 2 reduce tasks.");
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
+
+ // Have another TT run one task of each type of the high RAM
+ // job. This will fill up the TT.
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3");
+
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt3", expectedStrings);
+ checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f);
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 2, 0, 1, 2, 0),
+ (String) job2.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L);
+
+ LOG.debug(
+ "Submit one normal memory(1GB maps/reduces) job of "
+ + "1 map, 1 reduce tasks.");
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
+
+ // Send a TT with insufficient space for task assignment,
+ // This will cause a reservation for the high RAM job.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+
+ // reserved tasktrackers contribute to occupied slots for maps and reduces
+ checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+ LOG.info(job2.getSchedulingInfo());
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 2, 2, 1, 2, 2),
+ (String) job2.getSchedulingInfo());
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 0, 0, 0, 0, 0, 0),
+ (String) job3.getSchedulingInfo());
+
+ // Reservations are already done for job2. So job3 should go ahead.
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+ }
/**
* Testcase to verify fix for a NPE (HADOOP-5641), when memory based
@@ -1613,24 +1676,21 @@
jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // 1st cycle - 1 map gets assigned.
- Task t = checkAssignment(
+ // 1st cycle - 1 map and reduce gets assigned.
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ List<Task> t = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- // Total 1 map slot should be accounted for.
+ expectedStrings);
+ // Total 1 map slot and 1 reduce slot should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
- checkMemReservedForTasksOnTT("tt1", 512L, 0L);
-
- // 1st cycle of reduces - 1 reduce gets assigned.
- Task t1 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
- // Total 1 reduce slot should be accounted for.
- checkOccupiedSlots(
- "default", TaskType.REDUCE, 1, 1,
- 50.0f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
checkMemReservedForTasksOnTT("tt1", 512L, 512L);
// kill this job !
@@ -1653,20 +1713,21 @@
jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
// since with HADOOP-5964, we don't rely on a job conf to get
// the memory occupied, scheduling should be able to work correctly.
- t1 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
- checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
- // assign a reduce now.
- t1 = checkAssignment(
+ List<Task> t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
+ expectedStrings);
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
@@ -1674,22 +1735,22 @@
assertNull(scheduler.assignTasks(tracker("tt1")));
// finish the tasks on the tracker.
- taskTrackerManager.finishTask(t.getTaskID().toString(), job1);
- taskTrackerManager.finishTask(t1.getTaskID().toString(), job1);
+ for (Task task : t) {
+ taskTrackerManager.finishTask(task.getTaskID().toString(), job1);
+ }
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1");
// now a new task can be assigned.
- t = checkAssignment(
+ t = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000002_0 on tt1");
+ expectedStrings);
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
- // memory used will change because of the finished task above.
- checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
-
- // reduce can be assigned.
- t = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000002_0 on tt1");
checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
+ // memory used will change because of the finished task above.
checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
}
@@ -1721,9 +1782,10 @@
JobInitializationPoller initPoller = scheduler.getInitializationPoller();
// submit 4 jobs each for 3 users.
- HashMap<String, ArrayList<FakeJobInProgress>> userJobs = taskTrackerManager.submitJobs(
- 3,
- 4, "default");
+ HashMap<String, ArrayList<FakeJobInProgress>> userJobs =
+ taskTrackerManager.submitJobs(
+ 3,
+ 4, "default");
// get the jobs submitted.
ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
@@ -1782,31 +1844,37 @@
raiseStatusChangeEvents(mgr);
// get some tasks assigned.
- Task t1 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- Task t2 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
- Task t3 = checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
- Task t4 = checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_r_000001_0 on tt2");
- taskTrackerManager.finishTask(
- t1.getTaskID().toString(), u1Jobs.get(
- 0));
- taskTrackerManager.finishTask(
- t2.getTaskID().toString(), u1Jobs.get(
- 0));
- taskTrackerManager.finishTask(
- t3.getTaskID().toString(), u1Jobs.get(
- 1));
- taskTrackerManager.finishTask(
- t4.getTaskID().toString(), u1Jobs.get(
- 1));
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+
+ List<Task> t1 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt2");
+
+ List<Task> t2 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+
+ for (Task task : t1) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), u1Jobs.get(
+ 0));
+ }
+ for (Task task : t2) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), u1Jobs.get(
+ 0));
+ }
// as some jobs have running tasks, the poller will now
// pick up new jobs to initialize.
controlledInitializationPoller.selectJobsToInitialize();
@@ -1827,19 +1895,21 @@
"Initialized jobs contains the user1 job 2",
initializedJobs.contains(u1Jobs.get(1).getJobID()));
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+
// finish one more job
- t1 = checkAssignment(
+ t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_m_000001_0 on tt1");
- t2 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_r_000001_0 on tt1");
- taskTrackerManager.finishTask(
- t1.getTaskID().toString(), u1Jobs.get(
- 2));
- taskTrackerManager.finishTask(
- t2.getTaskID().toString(), u1Jobs.get(
- 2));
+ expectedStrings);
+ for (Task task : t1) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), u1Jobs.get(
+ 2));
+ }
// no new jobs should be picked up, because max user limit
// is still 3.
@@ -1847,19 +1917,21 @@
assertEquals(initializedJobs.size(), 5);
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0004_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+
// run 1 more jobs..
- t1 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0004_m_000001_0 on tt1");
- t1 = checkAssignment(
+ t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0004_r_000001_0 on tt1");
- taskTrackerManager.finishTask(
- t1.getTaskID().toString(), u1Jobs.get(
- 3));
- taskTrackerManager.finishTask(
- t2.getTaskID().toString(), u1Jobs.get(
- 3));
+ expectedStrings);
+ for (Task task : t1) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), u1Jobs.get(
+ 3));
+ }
// Now initialised jobs should contain user 4's job, as
// user 1's jobs are all done and the number of users is
@@ -1968,12 +2040,13 @@
taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "q1", "u1");
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q1");
- Task t = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- t = checkAssignment(
+ Map<String,String> strs = new HashMap<String,String>();
+ strs.put(CapacityTestUtils.MAP,"attempt_test_0001_m_000001_0 on tt1");
+ strs.put(CapacityTestUtils.REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
+ strs);
+
}
public void testFailedJobInitalizations() throws Exception {
@@ -2049,37 +2122,41 @@
conf.setReduceSpeculativeExecution(true);
//Submit a job which would have one speculative map and one speculative
//reduce.
- FakeJobInProgress fjob1 = taskTrackerManager.submitJob(JobStatus.PREP, conf);
+ FakeJobInProgress fjob1 = taskTrackerManager.submitJob(
+ JobStatus.PREP, conf);
conf = new JobConf();
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
//Submit a job which has no speculative map or reduce.
- FakeJobInProgress fjob2 = taskTrackerManager.submitJob(JobStatus.PREP, conf);
+ FakeJobInProgress fjob2 = taskTrackerManager.submitJob(
+ JobStatus.PREP, conf);
//Ask the poller to initalize all the submitted job and raise status
//change event.
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(mgr);
-
- checkAssignment(
+ Map<String, String> strs = new HashMap<String, String>();
+ strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ strs);
assertTrue(
"Pending maps of job1 greater than zero",
(fjob1.pendingMaps() == 0));
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000001_1 on tt2");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
+
assertTrue(
- "Pending reduces of job2 greater than zero",
+ "Pending reduces of job1 greater than zero",
(fjob1.pendingReduces() == 0));
- checkAssignment(
+
+ Map<String, String> str = new HashMap<String, String>();
+ str.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_1 on tt2");
+ str.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_1 on tt2");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_r_000001_1 on tt2");
+ str);
taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", fjob1);
taskTrackerManager.finishTask("attempt_test_0001_m_000001_1", fjob1);
@@ -2087,12 +2164,13 @@
taskTrackerManager.finishTask("attempt_test_0001_r_000001_1", fjob1);
taskTrackerManager.finalizeJob(fjob1);
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment(
+ str.clear();
+ str.put(CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
+ str.put(CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
+ str);
taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", fjob2);
taskTrackerManager.finishTask("attempt_test_0002_r_000001_0", fjob2);
taskTrackerManager.finalizeJob(fjob2);
@@ -2267,7 +2345,8 @@
jConf.setNumReduceTasks(6);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
// Submit a normal job to the other queue.
jConf = new JobConf(conf);
@@ -2277,108 +2356,178 @@
jConf.setNumReduceTasks(6);
jConf.setUser("u1");
jConf.setQueueName("q1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // Map 1 of high memory job
- checkAssignment(
+ // Map and reduce of high memory job should be assigned
+ HashMap<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings);
+
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.MAP));
- // Reduce 1 of high memory job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- // Map 1 of normal job
- checkAssignment(
+ // 1st map and reduce of normal job should be assigned
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings);
+
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
-
- // Reduce 1 of normal job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- // Map 2 of normal job
- checkAssignment(
+ // 2nd map and reduce of normal job should be assigned
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000002_0 on tt1");
+ expectedStrings);
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
-
- // Reduce 2 of normal job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000002_0 on tt1");
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
// Now both the queues are equally served. But the comparator doesn't change
// the order if queues are equally served.
+ // Hence, 3rd map and reduce of normal job should be assigned
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000003_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000003_0 on tt2");
- // Map 3 of normal job
- checkAssignment(
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000003_0 on tt2");
+ expectedStrings);
+
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
- // Reduce 3 of normal job
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_r_000003_0 on tt2");
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- // Map 2 of high memory job
- checkAssignment(
+ // 2nd map and reduce of high memory job should be assigned
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings);
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.MAP));
- // Reduce 2 of high memory job
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_r_000002_0 on tt2");
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- // Map 4 of normal job
- checkAssignment(
+ // 4th map and reduce of normal job should be assigned.
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000004_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000004_0 on tt2");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000004_0 on tt2");
+ expectedStrings);
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
- // Reduce 4 of normal job
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_r_000004_0 on tt2");
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
}
+ /**
+ * Tests whether 1 map and 1 reduce are assigned even if reduces span across
+ * multiple jobs or multiple queues.
+ *
+ * creates a cluster of 6 maps and 2 reduces.
+ * Submits 2 jobs:
+ * job1 , with 6 map and 1 reduces
+ * job2 with 2 map and 1 reduces
+ *
+ *
+ * check that first assignment assigns a map and a reduce.
+ * check that second assignment assigns a map and a reduce
+ * (both from other job and other queue)
+ *
+ * the last 2 calls just checks to make sure that we dont get further reduces
+ *
+ * @throws Exception
+ */
+ public void testMultiTaskAssignmentInMultipleQueues() throws Exception {
+ setUp(1, 6, 2);
+ // set up some queues
+ String[] qs = {"default", "q1"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
+ taskTrackerManager.setFakeQueues(queues);
+ scheduler.start();
+
+ //Submit the job with 6 maps and 2 reduces
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 6, 1, "default", "u1");
+
+ FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 1, "q1", "u2");
+
+ Map<String, String> str = new HashMap<String, String>();
+ str.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ str.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+
+ // next assignment will be for job in second queue.
+ str.clear();
+ str.put(MAP, "attempt_test_0002_m_000001_0 on tt1");
+ str.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+
+ //now both the reduce slots are being used , hence we sholdnot get only 1
+ //map task in this assignTasks call.
+ str.clear();
+ str.put(MAP, "attempt_test_0002_m_000002_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+
+ str.clear();
+ str.put(MAP, "attempt_test_0001_m_000002_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+ }
+
+
private void checkRunningJobMovementAndCompletion() throws IOException {
JobQueuesManager mgr = scheduler.jobQueuesManager;
@@ -2402,12 +2551,13 @@
mgr.getJobQueue("default").getRunningJobs().contains(job));
// assign a task
- Task t = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- t = checkAssignment(
+ Map<String,String> strs = new HashMap<String,String>();
+ strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ strs.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
+ strs);
controlledInitializationPoller.selectJobsToInitialize();
@@ -2551,14 +2701,14 @@
tracker(taskTracker).getStatus(),
TaskType.REDUCE);
if (expectedMemForMapsOnTT == null) {
- assertTrue(observedMemForMapsOnTT == null);
+ assertEquals(observedMemForMapsOnTT,null);
} else {
- assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+ assertEquals(observedMemForMapsOnTT,expectedMemForMapsOnTT);
}
if (expectedMemForReducesOnTT == null) {
- assertTrue(observedMemForReducesOnTT == null);
+ assertEquals(observedMemForReducesOnTT,null);
} else {
- assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+ assertEquals(observedMemForReducesOnTT,expectedMemForReducesOnTT);
}
}