You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2009/10/27 16:44:06 UTC
svn commit: r830230 [2/9] - in /hadoop/mapreduce/branches/HDFS-641: ./
.eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/
src/contrib/capacity-scheduler/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-sche...
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Tue Oct 27 15:43:58 2009
@@ -20,13 +20,13 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -48,6 +48,8 @@
public class CapacityTestUtils {
static final Log LOG =
LogFactory.getLog(org.apache.hadoop.mapred.CapacityTestUtils.class);
+ static final String MAP = "map";
+ static final String REDUCE = "reduce";
/**
@@ -160,18 +162,116 @@
}
}
-
+ /**
+ * The method accepts a attempt string and checks for validity of
+ * assignTask w.r.t attempt string.
+ *
+ * @param taskTrackerManager
+ * @param scheduler
+ * @param taskTrackerName
+ * @param expectedTaskString
+ * @return
+ * @throws IOException
+ */
static Task checkAssignment(
CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager,
CapacityTaskScheduler scheduler, String taskTrackerName,
String expectedTaskString) throws IOException {
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ if (expectedTaskString.contains("_m_")) {
+ expectedStrings.put(MAP, expectedTaskString);
+ } else if (expectedTaskString.contains("_r_")) {
+ expectedStrings.put(REDUCE, expectedTaskString);
+ }
+ List<Task> tasks = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, taskTrackerName, expectedStrings);
+ for (Task task : tasks) {
+ if (task.toString().equals(expectedTaskString)) {
+ return task;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Checks the validity of tasks assigned by scheduler's assignTasks method
+ * According to JIRA:1030 every assignTasks call in CapacityScheduler
+ * would result in either MAP or REDUCE or BOTH.
+ *
+ * This method accepts a Map<String,String>.
+ * The map should always have <=2 entried in hashMap.
+ *
+ * sample calling code .
+ *
+ * Map<String, String> expectedStrings = new HashMap<String, String>();
+ * ......
+ * .......
+ * expectedStrings.clear();
+ * expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ * expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+ * checkMultipleTaskAssignment(
+ * taskTrackerManager, scheduler, "tt1",
+ * expectedStrings);
+ *
+ * @param taskTrackerManager
+ * @param scheduler
+ * @param taskTrackerName
+ * @param expectedTaskStrings
+ * @return
+ * @throws IOException
+ */
+ static List<Task> checkMultipleTaskAssignment(
+ CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager,
+ CapacityTaskScheduler scheduler, String taskTrackerName,
+ Map<String,String> expectedTaskStrings) throws IOException {
+ //Call assign task
List<Task> tasks = scheduler.assignTasks(
taskTrackerManager.getTaskTracker(
taskTrackerName));
- assertNotNull(expectedTaskString, tasks);
- assertEquals(expectedTaskString, 1, tasks.size());
- assertEquals(expectedTaskString, tasks.get(0).toString());
- return tasks.get(0);
+
+ if (tasks==null || tasks.isEmpty()) {
+ if (expectedTaskStrings.size() > 0) {
+ fail("Expected some tasks to be assigned, but got none.");
+ } else {
+ return null;
+ }
+ }
+
+ if (expectedTaskStrings.size() > tasks.size()) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Expected strings different from actual strings.");
+ sb.append(" Expected string count=").append(expectedTaskStrings.size());
+ sb.append(" Actual string count=").append(tasks.size());
+ sb.append(" Expected strings=");
+ for (String expectedTask : expectedTaskStrings.values()) {
+ sb.append(expectedTask).append(",");
+ }
+ sb.append("Actual strings=");
+ for (Task actualTask : tasks) {
+ sb.append(actualTask.toString()).append(",");
+ }
+ fail(sb.toString());
+ }
+
+ for (Task task : tasks) {
+ LOG.info("tasks are : " + tasks.toString());
+ if (task.isMapTask()) {
+ //check if expected string is set for map or not.
+ if (expectedTaskStrings.get(MAP) != null) {
+ assertEquals(expectedTaskStrings.get(MAP), task.toString());
+ } else {
+ fail("No map task is expected, but got " + task.toString());
+ }
+ } else {
+ //check if expectedStrings is set for reduce or not.
+ if (expectedTaskStrings.get(REDUCE) != null) {
+ assertEquals(expectedTaskStrings.get(REDUCE), task.toString());
+ } else {
+ fail("No reduce task is expected, but got " + task.toString());
+ }
+ }
+ }
+ return tasks;
}
static void verifyCapacity(
@@ -207,8 +307,9 @@
public FakeJobInProgress(
JobID jId, JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager, String user) {
- super(jId, jobConf, null);
+ FakeTaskTrackerManager taskTrackerManager, String user,
+ JobTracker jt) throws IOException {
+ super(jId, jobConf, jt);
this.taskTrackerManager = taskTrackerManager;
this.startTime = System.currentTimeMillis();
this.status = new JobStatus(
@@ -372,8 +473,9 @@
public FakeFailingJobInProgress(
JobID id, JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager, String user) {
- super(id, jobConf, taskTrackerManager, user);
+ FakeTaskTrackerManager taskTrackerManager, String user,
+ JobTracker jt) throws IOException {
+ super(id, jobConf, taskTrackerManager, user, jt);
}
@Override
@@ -686,7 +788,7 @@
FakeJobInProgress job =
new FakeJobInProgress(new JobID("test", ++jobCounter),
(jobConf == null ? new JobConf(defaultJobConf) : jobConf), this,
- jobConf.getUser());
+ jobConf.getUser(), UtilsForTests.getJobTracker());
job.getStatus().setRunState(state);
this.submitJob(job);
return job;
@@ -855,6 +957,8 @@
Properties p = new Properties();
p.setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
String.valueOf(q.capacity));
+ p.setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
+ String.valueOf(q.maxCapacity));
p.setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
String.valueOf(q.supportsPrio));
p.setProperty(
@@ -886,6 +990,7 @@
static class FakeQueueInfo {
String queueName;
float capacity;
+ float maxCapacity = -1.0f;
boolean supportsPrio;
int ulMin;