You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/10/13 15:44:14 UTC
svn commit: r824757 [3/3] - in /hadoop/mapreduce/branches/branch-0.21: ./
conf/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/h...
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java?rev=824757&r1=824756&r2=824757&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java Tue Oct 13 13:44:13 2009
@@ -23,7 +23,6 @@
import java.util.HashMap;
import java.util.Map;
import java.io.IOException;
-
import static org.apache.hadoop.mapred.CapacityTestUtils.*;
public class TestContainerQueue extends TestCase {
@@ -120,8 +119,8 @@
}
- public void testMaxCapacity() throws IOException{
- this.setUp(4,1,1);
+ public void testMaxCapacity() throws IOException {
+ this.setUp(4, 1, 1);
taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
AbstractQueue rt = QueueHierarchyBuilder.createRootAbstractQueue();
@@ -131,16 +130,16 @@
QueueSchedulingContext a2 = new QueueSchedulingContext(
"R.b", 25, 30, -1, -1, -1);
QueueSchedulingContext a3 = new QueueSchedulingContext(
- "R.c", 50, -1, -1, -1, -1);
+ "R.c", 50, -1, -1, -1, -1);
//Test for max capacity
AbstractQueue q = new JobQueue(rt, a1);
AbstractQueue q1 = new JobQueue(rt, a2);
AbstractQueue q2 = new JobQueue(rt, a3);
- scheduler.jobQueuesManager.addQueue((JobQueue)q);
- scheduler.jobQueuesManager.addQueue((JobQueue)q1);
- scheduler.jobQueuesManager.addQueue((JobQueue)q2);
+ scheduler.jobQueuesManager.addQueue((JobQueue) q);
+ scheduler.jobQueuesManager.addQueue((JobQueue) q1);
+ scheduler.jobQueuesManager.addQueue((JobQueue) q2);
scheduler.setRoot(rt);
rt.update(4, 4);
@@ -148,21 +147,37 @@
scheduler.updateContextInfoForTests();
// submit a job to the second queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "R.a", "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "R.a", "u1");
- //Queue R.a should not more than 2 slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
+ //Queue R.a should not more than 2 slots
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
"attempt_test_0001_m_000001_0 on tt1");
-
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
//Now the queue has already reached its max limit no further tasks should
// be given.
- List<Task> l = scheduler.assignTasks(taskTrackerManager.getTaskTracker("tt3"));
- assertNull(l);
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
+ "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+
+ assertNull(scheduler.assignTasks(
+ taskTrackerManager.getTaskTracker(
+ "tt3")));
}
@@ -269,86 +284,160 @@
scheduler.updateContextInfoForTests();
// verify initial capacity distribution
- TaskSchedulingContext mapTsc
- = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
+ TaskSchedulingContext mapTsc
+ = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 3);
-
+
mapTsc = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 5);
-
+
mapTsc = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 4);
mapTsc = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTsc.getCapacity(), 1);
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 0, 0, 0, 0 });
-
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{0, 0, 0, 0});
+
//Only Allow job submission to leaf queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.sch.prod",
- "u1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 4, 4, "rt.sch.prod",
+ "u1");
// submit a job to the second queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.sch.misc",
- "u1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 4, 4, "rt.sch.misc",
+ "u1");
//submit a job in gta level queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.gta", "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, "rt.gta", "u1");
int counter = 0;
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 0, 1, 1, 0 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0003_m_000001_0 on tt2");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 1, 1, 1, 0 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt3",
- "attempt_test_0002_m_000001_0 on tt3");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 1, 2, 1, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt4",
- "attempt_test_0003_m_000002_0 on tt4");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 2, 2, 1, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt5",
- "attempt_test_0001_m_000002_0 on tt5");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 2, 3, 2, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt6",
- "attempt_test_0001_m_000003_0 on tt6");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 2, 4, 3, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt7",
- "attempt_test_0003_m_000003_0 on tt7");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 3, 4, 3, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt8",
- "attempt_test_0001_m_000004_0 on tt8");
- assertUsedCapacity(map,
- new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
- new int[] { 3, 5, 4, 1 });
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{0, 1, 1, 0});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2");
- checkAssignment(taskTrackerManager, scheduler, "tt9",
- "attempt_test_0002_m_000002_0 on tt9");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 1, 1, 0});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 2, 1, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000002_0 on tt4");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000002_0 on tt4");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 2, 1, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt5");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt5");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt5",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 3, 2, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000003_0 on tt6");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000003_0 on tt6");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt6",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 4, 3, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000003_0 on tt7");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000003_0 on tt7");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt7",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{3, 4, 3, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000004_0 on tt8");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000004_0 on tt8");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt8",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{3, 5, 4, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt9");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt9");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt9",
+ expectedStrings);
}
/**
@@ -373,75 +462,124 @@
scheduler.updateContextInfoForTests();
// verify capacities as per the setup.
- TaskSchedulingContext mapTSC
+ TaskSchedulingContext mapTSC
= map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 2);
-
+
mapTSC = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 5);
-
+
mapTSC = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 4);
-
+
mapTSC = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
assertEquals(mapTSC.getCapacity(), 1);
-
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 0, 0, 0, 0 });
+
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{0, 0, 0, 0});
// submit a job to the second queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "rt.sch.misc",
- "u1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 20, 20, "rt.sch.misc",
+ "u1");
//submit a job in gta level queue
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "rt.gta", "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "rt.gta", "u1");
int counter = 0;
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 0, 1, 0, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 1, 1, 0, 1 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt3",
- "attempt_test_0001_m_000002_0 on tt3");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 1, 2, 0, 2 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt4",
- "attempt_test_0001_m_000003_0 on tt4");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 1, 3, 0, 3 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt5",
- "attempt_test_0002_m_000002_0 on tt5");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 2, 3, 0, 3 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt6",
- "attempt_test_0001_m_000004_0 on tt6");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 2, 4, 0, 4 });
-
- checkAssignment(taskTrackerManager, scheduler, "tt7",
- "attempt_test_0001_m_000005_0 on tt7");
- assertUsedCapacity(map,
- new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
- new int[] { 2, 5, 0, 5 });
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{0, 1, 0, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt2");
- checkAssignment(taskTrackerManager, scheduler, "tt8",
- "attempt_test_0001_m_000006_0 on tt8");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 1, 0, 1});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt3");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt3");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 2, 0, 2});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt4");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt4");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{1, 3, 0, 3});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt5");
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt5");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt5",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 3, 0, 3});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000004_0 on tt6");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000004_0 on tt6");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt6",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 4, 0, 4});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000005_0 on tt7");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000005_0 on tt7");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt7",
+ expectedStrings);
+ assertUsedCapacity(
+ map,
+ new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+ new int[]{2, 5, 0, 5});
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000006_0 on tt8");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000006_0 on tt8");
+
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt8",
+ expectedStrings);
}
// verify that the number of slots used for each queue
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java?rev=824757&r1=824756&r2=824757&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java Tue Oct 13 13:44:13 2009
@@ -37,7 +37,7 @@
import org.apache.hadoop.mapred.CapacityTestUtils.ControlledInitializationPoller;
import org.apache.hadoop.mapred.CapacityTestUtils.FakeJobInProgress;
import org.apache.hadoop.mapred.CapacityTestUtils.FakeTaskTrackerManager;
-import static org.apache.hadoop.mapred.CapacityTestUtils.checkAssignment;
+import static org.apache.hadoop.mapred.CapacityTestUtils.*;
import org.junit.After;
import org.junit.Test;
@@ -201,65 +201,111 @@
JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
queues[0].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
queues[1].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
setupAndStartSchedulerFramework(2, 2, 2);
FakeJobInProgress job1 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[1].getQueueName(), "user");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[1].getQueueName(), "user");
FakeJobInProgress job2 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[2].getQueueName(), "user");
-
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000002_0 on tt2");
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000002_0 on tt2");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[2].getQueueName(), "user");
+
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+//===========================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+//============================================
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
taskTrackerManager.killJob(job1.getJobID());
taskTrackerManager.killJob(job2.getJobID());
// change configuration
queues[1].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(25));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(25));
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(75));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(75));
// Re-write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
- taskTrackerManager.getQueueManager().refreshQueues(null,
- scheduler.getQueueRefresher());
+ taskTrackerManager.getQueueManager().refreshQueues(
+ null,
+ scheduler.getQueueRefresher());
job1 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[1].getQueueName(), "user");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[1].getQueueName(), "user");
job2 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0,
- queues[2].getQueueName(), "user");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 4, 4,
+ queues[2].getQueueName(), "user");
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0004_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0004_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0004_m_000003_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0004_r_000003_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0004_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0004_m_000002_0 on tt2");
- checkAssignment(taskTrackerManager, scheduler, "tt2",
- "attempt_test_0004_m_000003_0 on tt2");
}
/**
@@ -336,60 +382,84 @@
JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
queues[0].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
queues[1].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
- String.valueOf(100));
+ CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
+ String.valueOf(100));
// write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
setupAndStartSchedulerFramework(1, 2, 2);
FakeJobInProgress job1 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[2].getQueueName(), "user1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[2].getQueueName(), "user1");
FakeJobInProgress job2 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[2].getQueueName(), "user2");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[2].getQueueName(), "user2");
+
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000002_0 on tt1");
assertNull(scheduler.assignTasks(taskTrackerManager.getTaskTracker("tt1")));
taskTrackerManager.killJob(job1.getJobID());
taskTrackerManager.killJob(job2.getJobID());
// change configuration
queues[2].getProperties().setProperty(
- CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
- String.valueOf(50));
+ CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
+ String.valueOf(50));
// Re-write the configuration file
QueueManagerTestUtils.writeQueueConfigurationFile(
- queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
- taskTrackerManager.getQueueManager().refreshQueues(null,
- scheduler.getQueueRefresher());
+ taskTrackerManager.getQueueManager().refreshQueues(
+ null,
+ scheduler.getQueueRefresher());
job1 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[1].getQueueName(), "user1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[1].getQueueName(), "user1");
job2 =
- taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
- queues[2].getQueueName(), "user2");
-
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_m_000001_0 on tt1");
- checkAssignment(taskTrackerManager, scheduler, "tt1",
- "attempt_test_0004_m_000001_0 on tt1");
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 2,
+ queues[2].getQueueName(), "user2");
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+ expectedStrings.clear();
+ expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
}
/**
Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
/hadoop/core/trunk/src/contrib/data_join:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/data_join:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/data_join:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
/hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/core/src/contrib/eclipse-plugin:713112
/hadoop/core/trunk/src/contrib/eclipse-plugin:776175-784663
-/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
/hadoop/core/trunk/src/contrib/fairscheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/fairscheduler:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/fairscheduler:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
/hadoop/core/trunk/src/contrib/index:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/index:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/index:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
/hadoop/core/trunk/src/contrib/mrunit:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/mrunit:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/mrunit:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
/hadoop/core/trunk/src/contrib/sqoop:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/sqoop:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/sqoop:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/streaming:713112
/hadoop/core/trunk/src/contrib/streaming:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/streaming:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/streaming:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/vaidya/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/vaidya:713112
/hadoop/core/trunk/src/contrib/vaidya:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/vaidya:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/vaidya:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/examples/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/examples:713112
/hadoop/core/trunk/src/examples:776175-784663
-/hadoop/mapreduce/trunk/src/examples:818355,818918,818946
+/hadoop/mapreduce/trunk/src/examples:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/java:713112
/hadoop/core/trunk/src/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/java:818355,818918,818946
+/hadoop/mapreduce/trunk/src/java:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
/hadoop/core/trunk/src/test/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred:818355,818918,818946
+/hadoop/mapreduce/trunk/src/test/mapred:818355,818918,818946,824750
Propchange: hadoop/mapreduce/branches/branch-0.21/src/webapps/job/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/webapps/job:713112
/hadoop/core/trunk/src/webapps/job:776175-785643
-/hadoop/mapreduce/trunk/src/webapps/job:818355,818918,818946
+/hadoop/mapreduce/trunk/src/webapps/job:818355,818918,818946,824750