You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [31/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java Sat Nov 28 20:26:01 2009
@@ -22,17 +22,16 @@
 import java.util.List;
 
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeTaskInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import junit.extensions.TestSetup;
-import junit.framework.Test;
 import junit.framework.TestCase;
-import junit.framework.TestSuite;
 
 public class TestSetupTaskScheduling extends TestCase {
 
@@ -61,7 +60,7 @@
     @Override
     public synchronized void initTasks() throws IOException {
       super.initTasks();
-      JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+      Job.RawSplit emptySplit = new Job.RawSplit();
       setup = new TaskInProgress[2];
       setup[0] = new TaskInProgress(getJobID(), "test",  emptySplit,
           jobtracker, getJobConf(), this, numMapTasks + 1, 1);
@@ -95,10 +94,55 @@
     }
   }
 
+  static class FakeJobWithTaskCleanupTask 
+  extends FakeObjectUtilities.FakeJobInProgress {
+
+    FakeJobWithTaskCleanupTask(JobConf jobConf, 
+        JobTracker tracker) throws IOException {
+      super(jobConf, tracker);
+    }
+
+    /**
+     * Initialize tasks(1 map and 1 reduce task each needs 2 slots, similar to
+     * tasks of a high RAM job). 
+     */
+    @Override
+    public synchronized void initTasks() throws IOException {
+      super.initTasks();
+      Job.RawSplit emptySplit = new Job.RawSplit();
+      final int numSlotsPerTask = 2;
+      maps = new TaskInProgress[1];
+      reduces = new TaskInProgress[1];
+      
+      maps[0] = new FakeTaskInProgress(getJobID(), "test",  emptySplit,
+          jobtracker, getJobConf(), this, 0, numSlotsPerTask);
+      TaskAttemptID attemptId = new TaskAttemptID(maps[0].getTIPId(), 0);
+      
+      // make this task a taskCleanup task of a map task
+      mapCleanupTasks.add(attemptId);
+      TaskStatus stat = new MapTaskStatus(attemptId, 0.01f, 2,
+          TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0],
+          TaskStatus.Phase.MAP, new Counters());
+      maps[0].updateStatus(stat);
+      
+      //similarly for reduce task's taskCleanup task
+      reduces[0] = new FakeTaskInProgress(getJobID(), "test", 1,
+          0, jobtracker, getJobConf(), this, numSlotsPerTask);
+      attemptId = new TaskAttemptID(reduces[0].getTIPId(), 0);
+      
+      // make this task a taskCleanup task of a reduce task
+      reduceCleanupTasks.add(attemptId);
+      stat = new ReduceTaskStatus(attemptId, 0.01f, 2,
+          TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0],
+          TaskStatus.Phase.REDUCE, new Counters());
+      reduces[0].updateStatus(stat);
+    }
+  }
+
   public void setUp() throws Exception {
     JobConf conf = new JobConf();
-    conf.set("mapred.job.tracker", "localhost:0");
-    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+    conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+    conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
     jobTracker = new FakeJobTracker(conf, new Clock(), trackers);
     for (String tracker : trackers) {
       FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
@@ -106,36 +150,47 @@
   }
 
   // create a job for testing setup tasks and reservations
-  FakeJobInProgress createJob(boolean withSetup) throws IOException {
+  FakeJobInProgress createJob(TaskType taskType) throws IOException {
     JobConf conf = new JobConf();
     conf.setSpeculativeExecution(false);
     conf.setNumMapTasks(2);
     conf.setNumReduceTasks(2);
-    conf.set("mapred.max.reduce.failures.percent", ".70");
-    conf.set("mapred.max.map.failures.percent", ".70");
+    conf.set(JobContext.REDUCE_FAILURES_MAXPERCENT, ".70");
+    conf.set(JobContext.MAP_FAILURES_MAX_PERCENT, ".70");
     FakeJobInProgress job = null;
-    if (withSetup) {
-      job = new FakeJobWithSetupTask(conf, jobTracker);
-    } else {
-      conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+    if (taskType == null) {
+      conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
       job = new FakeJobInProgress(conf, jobTracker);
+    } else if (taskType == TaskType.JOB_SETUP) {
+      job = new FakeJobWithSetupTask(conf, jobTracker);
+    } else if (taskType == TaskType.TASK_CLEANUP) {
+      job = new FakeJobWithTaskCleanupTask(conf, jobTracker);
     }
     job.setClusterSize(trackers.length);
     job.initTasks();
     return job;
   }
   
-  // create a new TaskStatus and add to a list of status objects
-  void addNewTaskStatus(FakeJobInProgress job,
-      boolean isMapTask, String tracker, List<TaskStatus> reports) 
+  // create a new TaskStatus and add to a list of status objects.
+  // useMapSlot param is needed only when taskType is TASK_CLEANUP.
+  void addNewTaskStatus(FakeJobInProgress job, TaskType taskType,
+        boolean useMapSlot, String tracker, List<TaskStatus> reports) 
         throws IOException {
     TaskAttemptID task = null;
     TaskStatus status = null;
-    if (isMapTask) {
+    if (taskType == TaskType.MAP) {
       task = job.findMapTask(tracker);
       status = new MapTaskStatus(task, 0.01f, 2,
             TaskStatus.State.RUNNING, "", "", tracker,
             TaskStatus.Phase.MAP, new Counters());
+    } else if (taskType == TaskType.TASK_CLEANUP) {
+      if (useMapSlot) {
+        status = job.maps[0].taskStatuses.get(
+          new TaskAttemptID(job.maps[0].getTIPId(), 0));
+      } else {
+        status = job.reduces[0].taskStatuses.get(
+              new TaskAttemptID(job.reduces[0].getTIPId(), 0));
+      }
     } else {
       task = job.findReduceTask(tracker);
       status = new ReduceTaskStatus(task, 0.01f, 2,
@@ -162,7 +217,7 @@
    */
   public void testSetupTaskReturnedForFreeMapSlots() throws IOException {
     // create a job with a setup task.
-    FakeJobInProgress job = createJob(true);
+    FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
     jobTracker.jobs.put(job.getJobID(), job);
     
     // create a status simulating a free tasktracker
@@ -185,16 +240,16 @@
    */
   public void testMapSlotsCountedForSetup() throws IOException {
     // create a job with a setup task.
-    FakeJobInProgress job = createJob(true);
+    FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
     jobTracker.jobs.put(job.getJobID(), job);
     
     // create another job for reservation
-    FakeJobInProgress job1 = createJob(false);
+    FakeJobInProgress job1 = createJob(null);
     jobTracker.jobs.put(job1.getJobID(), job1);
    
     // create TT status for testing getSetupAndCleanupTasks
     List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
-    addNewTaskStatus(job, true, trackers[0], taskStatuses);
+    addNewTaskStatus(job, TaskType.MAP, true, trackers[0], taskStatuses);
     TaskTrackerStatus ttStatus 
       = createTaskTrackerStatus(trackers[0], taskStatuses);
     
@@ -213,19 +268,19 @@
    */
   public void testReduceSlotsCountedForSetup() throws IOException {
     // create a job with a setup task.
-    FakeJobInProgress job = createJob(true);
+    FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
     jobTracker.jobs.put(job.getJobID(), job);
     
     // create another job for reservation
-    FakeJobInProgress job1 = createJob(false);
+    FakeJobInProgress job1 = createJob(null);
     jobTracker.jobs.put(job1.getJobID(), job1);
     
     // create TT status for testing getSetupAndCleanupTasks
     List<TaskStatus> reports = new ArrayList<TaskStatus>();
     // because free map slots are checked first in code,
     // we fill up map slots also.
-    addNewTaskStatus(job1, true, trackers[1], reports);
-    addNewTaskStatus(job1, false, trackers[1], reports);
+    addNewTaskStatus(job1, TaskType.MAP, true, trackers[1], reports);
+    addNewTaskStatus(job1, TaskType.REDUCE, false,trackers[1], reports);
     TaskTrackerStatus ttStatus 
       = createTaskTrackerStatus(trackers[1], reports);
 
@@ -235,4 +290,48 @@
     assertNull(tasks);
     jobTracker.jobs.clear();
   }
+
+  void validateNumSlotsUsedForTaskCleanup(TaskTrackerStatus ttStatus)
+       throws IOException {
+    List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
+
+    assertEquals("Actual number of taskCleanup tasks is not same as expected", 1, tasks.size());
+    LOG.info("taskCleanup task is " + tasks.get(0));
+    assertTrue(tasks.get(0).isTaskCleanupTask());
+
+    // slots needed for taskCleanup task should be 1(even for high RAM jobs)
+    assertEquals("TaskCleanup task should not need more than 1 slot.",
+                 1, tasks.get(0).getNumSlotsRequired());
+  }
+  
+  /**
+   * Test to check that map slots are counted when returning
+   * a taskCleanup task.
+   * @throws IOException
+   */
+  public void testNumSlotsUsedForTaskCleanup() throws IOException {
+    // Create a high RAM job with a map task's cleanup task and a reduce task's
+    // cleanup task. Make this Fake job a high RAM job by setting the slots
+    // required for map/reduce task to 2.
+    FakeJobInProgress job = createJob(TaskType.TASK_CLEANUP);
+    jobTracker.jobs.put(job.getJobID(), job);
+   
+    // create TT status for testing getSetupAndCleanupTasks
+    List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
+    TaskTrackerStatus ttStatus =
+      createTaskTrackerStatus(trackers[0], taskStatuses);//create dummy status
+    addNewTaskStatus(job, TaskType.TASK_CLEANUP, true, trackers[0],
+                     taskStatuses);// status of map task's cleanup task
+    addNewTaskStatus(job, TaskType.TASK_CLEANUP, false, trackers[0],
+                     taskStatuses);// status of reduce task's cleanup task
+    ttStatus = createTaskTrackerStatus(trackers[0], taskStatuses);
+    
+    // validate mapTaskCleanup task
+    validateNumSlotsUsedForTaskCleanup(ttStatus);
+    
+    // validate reduceTaskCleanup task
+    validateNumSlotsUsedForTaskCleanup(ttStatus);
+    
+    jobTracker.jobs.clear();
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Sat Nov 28 20:26:01 2009
@@ -44,6 +44,7 @@
 import org.apache.hadoop.mapred.UtilsForTests.KillMapper;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 
 /** 
@@ -100,7 +101,7 @@
         // it with the MiniMRCluster
 
         myListener = new MyListener();
-        conf.set("mapred.job.tracker.handler.count", "1");
+        conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
         mrCluster =   new MiniMRCluster(0, 0,
             numTT, dfs.getFileSystem().getUri().toString(), 
             1, null, null, MR_UGI, new JobConf());
@@ -151,7 +152,7 @@
 
   private void verifyOutput(FileSystem fs, Path outDir) throws IOException {
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(outDir, new OutputLogFilter()));
+        fs.listStatus(outDir, new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(numReduces, outputFiles.length);
     InputStream is = fs.open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -208,7 +209,8 @@
 
     conf.set("mapred.reducer.class", "testjar.ExternalIdentityReducer");
 
-    conf.setLong("mapred.min.split.size", 1024*1024);
+    conf.setLong(org.apache.hadoop.mapreduce.lib.input.
+      FileInputFormat.SPLIT_MINSIZE, 1024*1024);
 
     conf.setNumReduceTasks(numReduces);
     conf.setJobPriority(JobPriority.HIGH);
@@ -257,9 +259,8 @@
     // Check Task directories
     TaskAttemptID taskid = new TaskAttemptID(
         new TaskID(jobId, TaskType.MAP, 1),0);
-    TestMiniMRWithDFS.checkTaskDirectories(
-        mrCluster, new String[]{jobId.toString()}, 
-        new String[]{taskid.toString()});
+    TestMiniMRWithDFS.checkTaskDirectories(mrCluster, TEST1_UGI.getUserName(),
+        new String[] { jobId.toString() }, new String[] { taskid.toString() });
 
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     int exitCode = TestJobClient.runTool(conf, new JobClient(),
@@ -279,16 +280,19 @@
     verifyOutput(outDir.getFileSystem(conf), outDir);
 
     //TestJobHistory
-    TestJobHistory.validateJobHistoryFileFormat(jobId, conf, "SUCCESS", false);
+    TestJobHistory.validateJobHistoryFileFormat(
+        mrCluster.getJobTrackerRunner().getJobTracker().getJobHistory(),
+        jobId, conf, "SUCCEEDED", false);
+    
     TestJobHistory.validateJobHistoryFileContent(mrCluster, job, conf);
-    TestJobHistory.validateJobHistoryUserLogLocation(job.getID(), conf);
 
     // Since we keep setKeepTaskFilesPattern, these files should still be
     // present and will not be cleaned up.
     for(int i=0; i < numTT; ++i) {
-      String jobDirStr = mrCluster.getTaskTrackerLocalDir(i)+
-      "/taskTracker/jobcache";
-      boolean b = FileSystem.getLocal(conf).delete(new Path(jobDirStr), true);
+      Path jobDirPath =
+          new Path(mrCluster.getTaskTrackerLocalDir(i), TaskTracker
+              .getJobCacheSubdir(TEST1_UGI.getUserName()));
+      boolean b = FileSystem.getLocal(conf).delete(jobDirPath, true);
       assertTrue(b);
     }
   }
@@ -315,9 +319,9 @@
     conf.setOutputFormat(NullOutputFormat.class);
     conf.setJobPriority(JobPriority.HIGH);
 
-    conf.setLong("mapred.map.max.attempts", 1);
+    conf.setLong(JobContext.MAP_MAX_ATTEMPTS, 1);
 
-    conf.set("hadoop.job.history.user.location", "none");
+    conf.set(JobContext.HISTORY_LOCATION, "none");
 
     conf.setNumReduceTasks(0);
 
@@ -366,15 +370,13 @@
     conf.setOutputFormat(NullOutputFormat.class);
     conf.setNumReduceTasks(0);
 
-    conf.setLong("mapred.map.max.attempts", 2);
+    conf.setLong(JobContext.MAP_MAX_ATTEMPTS, 2);
 
     final Path inDir = new Path("./wc/input");
     final Path outDir = new Path("./wc/output");
     final Path histDir = new Path("./wc/history");
 
-    conf.set("hadoop.job.history.user.location", histDir.toString());
-
-    conf.setNumReduceTasks(numReduces);
+    conf.set(JobContext.HISTORY_LOCATION, histDir.toString());
 
     FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java Sat Nov 28 20:26:01 2009
@@ -34,6 +34,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -66,7 +67,7 @@
 
     // use WordCount example
     FileSystem.setDefaultUri(conf, fileSys);
-    conf.set("mapred.job.tracker", jobTracker);
+    conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
     conf.setJobName("foo");
 
     conf.setInputFormat(TextInputFormat.class);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Sat Nov 28 20:26:01 2009
@@ -27,6 +27,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 public class TestSpeculativeExecution extends TestCase {
 
@@ -49,8 +50,8 @@
       new TestSetup(new TestSuite(TestSpeculativeExecution.class)) {
       protected void setUp() throws Exception {
         JobConf conf = new JobConf();
-        conf.set("mapred.job.tracker", "localhost:0");
-        conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+        conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+        conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
         jobTracker = new FakeJobTracker(conf, (clock = new SpecFakeClock()),
             trackers);
         for (String tracker : trackers) {
@@ -109,7 +110,7 @@
     conf.setSpeculativeExecution(true);
     conf.setNumMapTasks(5);
     conf.setNumReduceTasks(5);
-    conf.setFloat("mapred.speculative.execution.slowTaskThreshold", 0.5f);
+    conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
     FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);    
     job.initTasks();
     //schedule maps
@@ -145,7 +146,7 @@
     conf.setSpeculativeExecution(true);
     conf.setNumMapTasks(5);
     conf.setNumReduceTasks(0);
-    conf.setFloat("mapred.speculative.execution.slowTaskThreshold", 0.5f);
+    conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
     FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
     job.initTasks();
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java Sat Nov 28 20:26:01 2009
@@ -19,8 +19,10 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.util.ToolRunner;
 
 import junit.framework.TestCase;
@@ -46,12 +48,12 @@
       throws Exception {
     JobConf jtConf = new JobConf();
     jtConf
-        .setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
-    jtConf.setLong(JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+        .setLong(MRConfig.MAPMEMORY_MB, 1 * 1024L);
+    jtConf.setLong(MRConfig.REDUCEMEMORY_MB,
         2 * 1024L);
-    jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+    jtConf.setLong(JTConfig.JT_MAX_MAPMEMORY_MB,
         3 * 1024L);
-    jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+    jtConf.setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB,
         4 * 1024L);
 
     miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java Sat Nov 28 20:26:01 2009
@@ -22,10 +22,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 import junit.framework.TestCase;
@@ -157,10 +159,8 @@
         4 * 1024 * 1024 * 1024L);
     conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
         2 * 1024 * 1024 * 1024L);
-    conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
-        512L);
-    conf.setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L);
+    conf.setLong(MRConfig.MAPMEMORY_MB, 512L);
+    conf.setLong(MRConfig.REDUCEMEMORY_MB, 1024L);
     
     try {
       setUpCluster(conf);
@@ -202,9 +202,9 @@
 
   private void setUpCluster(JobConf conf)
                                 throws Exception {
-    conf.setClass("mapred.jobtracker.taskScheduler",
+    conf.setClass(JTConfig.JT_TASK_SCHEDULER,
         TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class);
-    conf.set("mapred.job.tracker.handler.count", "1");
+    conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
     miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
   }
   

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Sat Nov 28 20:26:01 2009
@@ -19,6 +19,7 @@
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 
 import junit.framework.TestCase;
 
@@ -30,6 +31,7 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 
 public class TestTaskFail extends TestCase {
@@ -40,7 +42,7 @@
   implements Mapper<LongWritable, Text, Text, IntWritable> {
     String taskid;
     public void configure(JobConf job) {
-      taskid = job.get("mapred.task.id");
+      taskid = job.get(JobContext.TASK_ATTEMPT_ID);
     }
     public void map (LongWritable key, Text value, 
                      OutputCollector<Text, IntWritable> output, 
@@ -50,7 +52,9 @@
         throw new IOException();
       } else if (taskid.endsWith("_1")) {
         System.exit(-1);
-      } 
+      } else if (taskid.endsWith("_2")) {
+        throw new Error();
+      }
     }
   }
 
@@ -100,6 +104,7 @@
     conf.setNumReduceTasks(0);
     FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setSpeculativeExecution(false);
     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
                                     "/tmp")).toString().replace(' ', '+');
     conf.set("test.build.data", TEST_ROOT_DIR);
@@ -107,46 +112,92 @@
     return new JobClient(conf).submitJob(conf);
   }
   
+  private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId, 
+                               TaskStatus ts, boolean isCleanup) 
+  throws IOException {
+    assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
+    assertTrue(ts != null);
+    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+    // validate tasklogs for task attempt
+    String log = readTaskLog(
+                      TaskLog.LogName.STDERR, attemptId, false);
+    assertTrue(log.contains(taskLog));
+    if (!isCleanup) {
+      // validate task logs: tasklog should contain both task logs
+      // and cleanup logs
+      assertTrue(log.contains(cleanupLog));
+    } else {
+      // validate tasklogs for cleanup attempt
+      log = readTaskLog(
+                 TaskLog.LogName.STDERR, attemptId, true);
+      assertTrue(log.contains(cleanupLog));
+    }
+  }
+
+  /**
+   * Reads tasklog and returns it as string after trimming it.
+   * @param filter Task log filter; can be STDOUT, STDERR,
+   *                SYSLOG, DEBUGOUT, DEBUGERR
+   * @param taskId The task id for which the log has to collected
+   * @param isCleanup whether the task is a cleanup attempt or not.
+   * @return task log as string
+   * @throws IOException
+   */
+  private String readTaskLog(TaskLog.LogName  filter, 
+                                   TaskAttemptID taskId, 
+                                   boolean isCleanup)
+  throws IOException {
+    // string buffer to store task log
+    StringBuffer result = new StringBuffer();
+    int res;
+
+    // reads the whole tasklog into inputstream
+    InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
+    // construct string log from inputstream.
+    byte[] b = new byte[65536];
+    while (true) {
+      res = taskLogReader.read(b);
+      if (res > 0) {
+        result.append(new String(b));
+      } else {
+        break;
+      }
+    }
+    taskLogReader.close();
+    
+    // trim the string and return it
+    String str = result.toString();
+    str = str.trim();
+    return str;
+  }
+  
   private void validateJob(RunningJob job, MiniMRCluster mr) 
   throws IOException {
     assertEquals(JobStatus.SUCCEEDED, job.getJobState());
 	    
     JobID jobId = job.getID();
     // construct the task id of first map task
+    // this should not be cleanup attempt since the first attempt 
+    // fails with an exception
     TaskAttemptID attemptId = 
       new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 0);
     TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
                             getTip(attemptId.getTaskID());
-    // this should not be cleanup attempt since the first attempt 
-    // fails with an exception
-    assertTrue(!tip.isCleanupAttempt(attemptId));
     TaskStatus ts = 
       mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    assertTrue(ts != null);
-    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-    // validate task logs: tasklog should contain both task logs
-    // and cleanup logs
-    String log = TestMiniMRMapRedDebugScript.readTaskLog(
-                      TaskLog.LogName.STDERR, attemptId, false);
-    assertTrue(log.contains(taskLog));
-    assertTrue(log.contains(cleanupLog));
+    validateAttempt(tip, attemptId, ts, false);
     
     attemptId =  new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 1);
     // this should be cleanup attempt since the second attempt fails
     // with System.exit
-    assertTrue(tip.isCleanupAttempt(attemptId));
     ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    assertTrue(ts != null);
-    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-    // validate tasklogs for task attempt
-    log = TestMiniMRMapRedDebugScript.readTaskLog(
-               TaskLog.LogName.STDERR, attemptId, false);
-    assertTrue(log.contains(taskLog));
-
-    // validate tasklogs for cleanup attempt
-    log = TestMiniMRMapRedDebugScript.readTaskLog(
-               TaskLog.LogName.STDERR, attemptId, true);
-    assertTrue(log.contains(cleanupLog));
+    validateAttempt(tip, attemptId, ts, true);
+    
+    attemptId =  new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 2);
+    // this should be cleanup attempt since the third attempt fails
+    // with Error
+    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+    validateAttempt(tip, attemptId, ts, true);
   }
   
   public void testWithDFS() throws IOException {
@@ -165,6 +216,9 @@
       String input = "The quick brown fox\nhas many silly\nred fox sox\n";
       // launch job with fail tasks
       JobConf jobConf = mr.createJobConf();
+      // turn down the completion poll interval from the 5 second default
+      // for better test performance.
+      jobConf.set(Job.COMPLETION_POLL_INTERVAL_KEY, "50");
       jobConf.setOutputCommitter(CommitterWithLogs.class);
       RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java Sat Nov 28 20:26:01 2009
@@ -21,6 +21,8 @@
 import junit.framework.TestCase;
 import java.io.IOException;
 
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
 /**
  * A JUnit test to test configured task limits.
  */
@@ -29,8 +31,8 @@
   static void runTest(int maxTasks, int numMaps, int numReds, 
                       boolean shouldFail) throws Exception {
     JobConf conf = new JobConf();
-    conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks);
-    conf.set("mapred.job.tracker.handler.count", "1");
+    conf.setInt(JTConfig.JT_TASKS_PER_JOB, maxTasks);
+    conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
     MiniMRCluster mr = new MiniMRCluster(0, "file:///", 1, null, null, conf);
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobConf jc = mr.createJobConf();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java Sat Nov 28 20:26:01 2009
@@ -37,6 +37,7 @@
 import org.apache.hadoop.mapred.JobTracker.ReasonForBlackListing;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 public class TestTaskTrackerBlacklisting extends TestCase {
@@ -144,9 +145,9 @@
       new TestSetup(new TestSuite(TestTaskTrackerBlacklisting.class)) {
       protected void setUp() throws Exception {
         JobConf conf = new JobConf();
-        conf.set("mapred.job.tracker", "localhost:0");
-        conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
-        conf.setInt("mapred.max.tracker.blacklists", 1);
+        conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+        conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
+        conf.setInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 1);
 
         jobTracker = 
           new FakeJobTracker(conf, (clock = new FakeJobTrackerClock()),
@@ -173,7 +174,7 @@
         healthStatus.setLastReported(status.getLastReported());
       }
       jobTracker.heartbeat(tts, false, initialContact, 
-                           false, (short) responseId);
+                           false, responseId);
     }
     responseId++;
   }
@@ -468,8 +469,8 @@
     conf.setSpeculativeExecution(false);
     conf.setNumMapTasks(0);
     conf.setNumReduceTasks(5);
-    conf.set("mapred.max.reduce.failures.percent", ".70");
-    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+    conf.set(JobContext.REDUCE_FAILURES_MAXPERCENT, ".70");
+    conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
     conf.setMaxTaskFailuresPerTracker(1);
     FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
     job.setClusterSize(trackers.length);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Sat Nov 28 20:26:01 2009
@@ -18,22 +18,32 @@
 package org.apache.hadoop.mapred;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
-import javax.security.auth.login.LoginException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 
 import junit.framework.TestCase;
@@ -54,20 +64,53 @@
       LogFactory.getLog(TestTaskTrackerLocalization.class);
 
   protected TaskTracker tracker;
+  protected UserGroupInformation taskTrackerUGI;
+  protected TaskController taskController;
   protected JobConf trackerFConf;
+  private JobConf localizedJobConf;
   protected JobID jobId;
   protected TaskAttemptID taskId;
   protected Task task;
   protected String[] localDirs;
   protected static LocalDirAllocator lDirAlloc =
-      new LocalDirAllocator("mapred.local.dir");
+      new LocalDirAllocator(MRConfig.LOCAL_DIR);
+  protected Path attemptWorkDir;
+  protected File[] attemptLogFiles;
+  protected JobConf localizedTaskConf;
+
+  class InlineCleanupQueue extends CleanupQueue {
+    List<Path> stalePaths = new ArrayList<Path>();
+
+    public InlineCleanupQueue() {
+      // do nothing
+    }
+
+    @Override
+    public void addToQueue(FileSystem fs, Path... paths) {
+      // delete in-line
+      for (Path p : paths) {
+        try {
+          LOG.info("Trying to delete the path " + p);
+          if (!fs.delete(p, true)) {
+            LOG.warn("Stale path " + p.toUri().getPath());
+            stalePaths.add(p);
+          }
+        } catch (IOException e) {
+          LOG.warn("Caught exception while deleting path "
+              + p.toUri().getPath());
+          LOG.info(StringUtils.stringifyException(e));
+          stalePaths.add(p);
+        }
+      }
+    }
+  }
 
   @Override
   protected void setUp()
       throws Exception {
     TEST_ROOT_DIR =
-        new File(System.getProperty("test.build.data", "/tmp"),
-            "testTaskTrackerLocalization");
+        new File(System.getProperty("test.build.data", "/tmp"), getClass()
+            .getSimpleName());
     if (!TEST_ROOT_DIR.exists()) {
       TEST_ROOT_DIR.mkdirs();
     }
@@ -85,32 +128,29 @@
     for (int i = 0; i < numLocalDirs; i++) {
       localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
     }
-    trackerFConf.setStrings("mapred.local.dir", localDirs);
+    trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs);
 
-    // Create the job jar file
-    File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
-    JarOutputStream jstream =
-        new JarOutputStream(new FileOutputStream(jobJarFile));
-    ZipEntry ze = new ZipEntry("lib/lib1.jar");
-    jstream.putNextEntry(ze);
-    jstream.closeEntry();
-    ze = new ZipEntry("lib/lib2.jar");
-    jstream.putNextEntry(ze);
-    jstream.closeEntry();
-    jstream.finish();
-    jstream.close();
-    trackerFConf.setJar(jobJarFile.toURI().toString());
+    // Create the job configuration file. Same as trackerConf in this test.
+    Job job = new Job(trackerFConf);
 
-    // Create the job configuration file
-    File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
-    FileOutputStream out = new FileOutputStream(jobConfFile);
-    trackerFConf.writeXml(out);
-    out.close();
+    job.setUGIAndUserGroupNames();
+
+    // JobClient uploads the job jar to the file system and sets it in the
+    // jobConf.
+    uploadJobJar(job);
 
-    // Set up the TaskTracker
+    // JobClient uploads the jobConf to the file system.
+    File jobConfFile = uploadJobConf(job.getConfiguration());
+    
+        // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
-    tracker.systemFS = FileSystem.getLocal(trackerFConf); // for test case
+
+    // for test case system FS is the local FS
+    tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
+    tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
+    
+    taskTrackerUGI = UserGroupInformation.login(trackerFConf);
 
     // Set up the task to be localized
     String jtIdentifier = "200907202331";
@@ -119,10 +159,75 @@
         new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
     task =
         new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
+    task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
 
-    TaskController taskController = new DefaultTaskController();
+    // create jobTokens file
+    uploadJobTokensFile(); 
+    
+    
+    taskController = new DefaultTaskController();
     taskController.setConf(trackerFConf);
     taskController.setup();
+
+    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+        taskController));
+  }
+
+  /**
+   * @param job
+   * @throws IOException
+   * @throws FileNotFoundException
+   */
+  private void uploadJobJar(Job job)
+      throws IOException,
+      FileNotFoundException {
+    File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
+    JarOutputStream jstream =
+        new JarOutputStream(new FileOutputStream(jobJarFile));
+    ZipEntry ze = new ZipEntry("lib/lib1.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    ze = new ZipEntry("lib/lib2.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    jstream.finish();
+    jstream.close();
+    job.setJar(jobJarFile.toURI().toString());
+  }
+
+  /**
+   * @param conf
+   * @return
+   * @throws FileNotFoundException
+   * @throws IOException
+   */
+  protected File uploadJobConf(Configuration conf)
+      throws FileNotFoundException,
+      IOException {
+    File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+    FileOutputStream out = new FileOutputStream(jobConfFile);
+    conf.writeXml(out);
+    out.close();
+    return jobConfFile;
+  }
+  
+  /**
+   * create fake JobTokens file
+   * @return
+   * @throws IOException
+   */
+  protected void uploadJobTokensFile() throws IOException {
+    
+    File dir = new File(TEST_ROOT_DIR, jobId.toString());
+    if(!dir.exists())
+      assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
+    
+    File jobTokenFile = new File(dir, JobTokens.JOB_TOKEN_FILENAME);
+    FileOutputStream fos = new FileOutputStream(jobTokenFile);
+    java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
+    JobTokens jt = new JobTokens();
+    jt.write(out); // writing empty file, we don't the keys for this test 
+    out.close();
   }
 
   @Override
@@ -131,7 +236,7 @@
     FileUtil.fullyDelete(TEST_ROOT_DIR);
   }
 
-  private static String[] getFilePermissionAttrs(String path)
+  protected static String[] getFilePermissionAttrs(String path)
       throws IOException {
     String output = Shell.execCommand("stat", path, "-c", "%A:%U:%G");
     return output.split(":|\n");
@@ -146,106 +251,162 @@
     assertTrue("Path " + path + " has the permissions " + attrs[0]
         + " instead of the expected " + expectedPermissions, attrs[0]
         .equals(expectedPermissions));
-    assertTrue("Path " + path + " is not user owned not by "
-        + expectedOwnerUser + " but by " + attrs[1], attrs[1]
-        .equals(expectedOwnerUser));
-    assertTrue("Path " + path + " is not group owned not by "
-        + expectedOwnerGroup + " but by " + attrs[2], attrs[2]
-        .equals(expectedOwnerGroup));
+    assertTrue("Path " + path + " is user owned not by " + expectedOwnerUser
+        + " but by " + attrs[1], attrs[1].equals(expectedOwnerUser));
+    assertTrue("Path " + path + " is group owned not by " + expectedOwnerGroup
+        + " but by " + attrs[2], attrs[2].equals(expectedOwnerGroup));
   }
 
   /**
    * Verify the task-controller's setup functionality
    * 
    * @throws IOException
-   * @throws LoginException
    */
   public void testTaskControllerSetup()
-      throws IOException,
-      LoginException {
+      throws IOException {
     // Task-controller is already set up in the test's setup method. Now verify.
-    UserGroupInformation ugi = UserGroupInformation.login(new JobConf());
     for (String localDir : localDirs) {
 
       // Verify the local-dir itself.
       File lDir = new File(localDir);
       assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
-      checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", ugi
-          .getUserName(), ugi.getGroupNames()[0]);
+      checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", task
+          .getUser(), taskTrackerUGI.getGroupNames()[0]);
+    }
+
+    // Verify the pemissions on the userlogs dir
+    File taskLog = TaskLog.getUserLogDir();
+    checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", task
+        .getUser(), taskTrackerUGI.getGroupNames()[0]);
+  }
+
+  /**
+   * Test the localization of a user on the TT.
+   * 
+   * @throws IOException
+   */
+  public void testUserLocalization()
+      throws IOException {
+
+    // /////////// The main method being tested
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
+    // ///////////
+
+    // Check the directory structure and permissions
+    checkUserLocalization();
+
+    // For the sake of testing re-entrancy of initializeUserDirs(), we remove
+    // the user directories now and make sure that further calls of the method
+    // don't create directories any more.
+    for (String dir : localDirs) {
+      File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
+      FileUtil.fullyDelete(userDir);
+    }
+
+    // Now call the method again.
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
+
+    // Files should not be created now and so shouldn't be there anymore.
+    for (String dir : localDirs) {
+      File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
+      assertFalse("Unexpectedly, user-dir " + userDir.getAbsolutePath()
+          + " exists!", userDir.exists());
+    }
+  }
+
+  protected void checkUserLocalization()
+      throws IOException {
+    for (String dir : localDirs) {
+
+      File localDir = new File(dir);
+      assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!",
+          localDir.exists());
+
+      File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+      assertTrue("taskTracker sub-dir in the local-dir " + localDir
+          + "is not created!", taskTrackerSubDir.exists());
+
+      File userDir = new File(taskTrackerSubDir, task.getUser());
+      assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+          + "is not created!", userDir.exists());
+      checkFilePermissions(userDir.getAbsolutePath(), "drwx------", task
+          .getUser(), taskTrackerUGI.getGroupNames()[0]);
+
+      File jobCache = new File(userDir, TaskTracker.JOBCACHE);
+      assertTrue("jobcache in the userDir " + userDir + " isn't created!",
+          jobCache.exists());
+      checkFilePermissions(jobCache.getAbsolutePath(), "drwx------", task
+          .getUser(), taskTrackerUGI.getGroupNames()[0]);
 
       // Verify the distributed cache dir.
       File distributedCacheDir =
-          new File(localDir, TaskTracker.getDistributedCacheDir());
+          new File(localDir, TaskTracker
+              .getDistributedCacheDir(task.getUser()));
       assertTrue("distributed cache dir " + distributedCacheDir
           + " doesn't exists!", distributedCacheDir.exists());
       checkFilePermissions(distributedCacheDir.getAbsolutePath(),
-          "drwxr-xr-x", ugi.getUserName(), ugi.getGroupNames()[0]);
-
-      // Verify the job cache dir.
-      File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
-      assertTrue("jobCacheDir " + jobCacheDir + " doesn't exists!",
-          jobCacheDir.exists());
-      checkFilePermissions(jobCacheDir.getAbsolutePath(), "drwxr-xr-x", ugi
-          .getUserName(), ugi.getGroupNames()[0]);
+          "drwx------", task.getUser(), taskTrackerUGI.getGroupNames()[0]);
     }
-
-    // Verify the pemissions on the userlogs dir
-    File taskLog = TaskLog.getUserLogDir();
-    checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", ugi
-        .getUserName(), ugi.getGroupNames()[0]);
   }
 
   /**
    * Test job localization on a TT. Tests localization of job.xml, job.jar and
-   * corresponding setting of configuration.
+   * corresponding setting of configuration. Also test
+   * {@link TaskController#initializeJob(JobInitializationContext)}
    * 
    * @throws IOException
-   * @throws LoginException
    */
   public void testJobLocalization()
-      throws IOException,
-      LoginException {
+      throws IOException {
+
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
 
     // /////////// The main method being tested
-    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+    localizedJobConf = tracker.localizeJobFiles(task);
     // ///////////
 
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext context = new JobInitializationContext();
+    context.jobid = jobId;
+    context.user = task.getUser();
+    context.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+
+    // /////////// The method being tested
+    taskController.initializeJob(context);
+    // ///////////
+
+    checkJobLocalization();
+  }
+
+  protected void checkJobLocalization()
+      throws IOException {
     // Check the directory structure
     for (String dir : localDirs) {
 
       File localDir = new File(dir);
-      assertTrue("mapred.local.dir " + localDir + " isn'task created!",
-          localDir.exists());
-
       File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
-      assertTrue("taskTracker sub-dir in the local-dir " + localDir
-          + "is not created!", taskTrackerSubDir.exists());
-
-      File jobCache = new File(taskTrackerSubDir, TaskTracker.JOBCACHE);
-      assertTrue("jobcache in the taskTrackerSubdir " + taskTrackerSubDir
-          + " isn'task created!", jobCache.exists());
+      File userDir = new File(taskTrackerSubDir, task.getUser());
+      File jobCache = new File(userDir, TaskTracker.JOBCACHE);
 
       File jobDir = new File(jobCache, jobId.toString());
-      assertTrue("job-dir in " + jobCache + " isn'task created!", jobDir
-          .exists());
+      assertTrue("job-dir in " + jobCache + " isn't created!", jobDir.exists());
 
       // check the private permissions on the job directory
-      UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
-      checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", ugi
-          .getUserName(), ugi.getGroupNames()[0]);
+      checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", task
+          .getUser(), taskTrackerUGI.getGroupNames()[0]);
     }
 
     // check the localization of job.xml
-    LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
-
     assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
-        .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()),
-            trackerFConf) != null);
+        .getLocalPathToRead(TaskTracker.getLocalJobConfFile(task.getUser(),
+            jobId.toString()), trackerFConf) != null);
 
     // check the localization of job.jar
     Path jarFileLocalized =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
-            .toString()), trackerFConf);
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task.getUser(),
+            jobId.toString()), trackerFConf);
     assertTrue("job.jar is not localized on this TaskTracker!!",
         jarFileLocalized != null);
     assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
@@ -257,22 +418,22 @@
 
     // check the creation of job work directory
     assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
-        .getLocalPathToRead(TaskTracker.getJobWorkDir(jobId.toString()),
-            trackerFConf) != null);
+        .getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), jobId
+            .toString()), trackerFConf) != null);
 
-    // Check the setting of job.local.dir and job.jar which will eventually be
+    // Check the setting of mapreduce.job.local.dir and job.jar which will eventually be
     // used by the user's task
     boolean jobLocalDirFlag = false, mapredJarFlag = false;
     String localizedJobLocalDir =
         localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR);
     String localizedJobJar = localizedJobConf.getJar();
-    for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) {
+    for (String localDir : localizedJobConf.getStrings(MRConfig.LOCAL_DIR)) {
       if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
-          + TaskTracker.getJobWorkDir(jobId.toString()))) {
+          + TaskTracker.getJobWorkDir(task.getUser(), jobId.toString()))) {
         jobLocalDirFlag = true;
       }
       if (localizedJobJar.equals(localDir + Path.SEPARATOR
-          + TaskTracker.getJobJarFile(jobId.toString()))) {
+          + TaskTracker.getJobJarFile(task.getUser(), jobId.toString()))) {
         mapredJarFlag = true;
       }
     }
@@ -280,7 +441,7 @@
         + " is not set properly to the target users directory : "
         + localizedJobLocalDir, jobLocalDirFlag);
     assertTrue(
-        "mapred.jar is not set properly to the target users directory : "
+        "mapreduce.job.jar is not set properly to the target users directory : "
             + localizedJobJar, mapredJarFlag);
   }
 
@@ -288,13 +449,21 @@
    * Test task localization on a TT.
    * 
    * @throws IOException
-   * @throws LoginException
    */
   public void testTaskLocalization()
-      throws IOException,
-      LoginException {
+      throws IOException {
 
-    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
+    localizedJobConf = tracker.localizeJobFiles(task);
+
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext jobContext = new JobInitializationContext();
+    jobContext.jobid = jobId;
+    jobContext.user = task.getUser();
+    jobContext.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+    taskController.initializeJob(jobContext);
 
     TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     tip.setJobConf(localizedJobConf);
@@ -304,78 +473,195 @@
     // //////////
 
     // check the functionality of localizeTask
-    for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
-      assertTrue("attempt-dir in localDir " + dir + " is not created!!",
-          new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId
-              .toString())).exists());
+    for (String dir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) {
+      File attemptDir =
+          new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
+              .toString(), taskId.toString()));
+      assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
+          + " is not created!!", attemptDir.exists());
     }
 
-    Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
-            .getJobID().toString(), task.getTaskID().toString(), task
-            .isTaskCleanupTask()), trackerFConf);
+    attemptWorkDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+            task.getUser(), task.getJobID().toString(), task.getTaskID()
+                .toString(), task.isTaskCleanupTask()), trackerFConf);
     assertTrue("atttempt work dir for " + taskId.toString()
-        + " is not created in any of the configured dirs!!", workDir != null);
+        + " is not created in any of the configured dirs!!",
+        attemptWorkDir != null);
 
     TaskRunner runner = task.createRunner(tracker, tip);
 
     // /////// Few more methods being tested
     runner.setupChildTaskConfiguration(lDirAlloc);
-    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+    TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
         localizedJobConf);
-    File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
-    // ///////
+    attemptLogFiles = TaskRunner.prepareLogFiles(task.getTaskID());
 
     // Make sure the task-conf file is created
     Path localTaskFile =
         lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
-            .getJobID().toString(), task.getTaskID().toString(), task
-            .isTaskCleanupTask()), trackerFConf);
+            .getUser(), task.getJobID().toString(), task.getTaskID()
+            .toString(), task.isTaskCleanupTask()), trackerFConf);
     assertTrue("Task conf file " + localTaskFile.toString()
         + " is not created!!", new File(localTaskFile.toUri().getPath())
         .exists());
 
     // /////// One more method being tested. This happens in child space.
-    JobConf localizedTaskConf = new JobConf(localTaskFile);
+    localizedTaskConf = new JobConf(localTaskFile);
     TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
     // ///////
 
-    // Make sure that the mapred.local.dir is sandboxed
+    // Initialize task via TaskController
+    TaskControllerContext taskContext =
+        new TaskController.TaskControllerContext();
+    taskContext.env =
+        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+    taskContext.task = task;
+    // /////////// The method being tested
+    taskController.initializeTask(taskContext);
+    // ///////////
+
+    checkTaskLocalization();
+  }
+
+  protected void checkTaskLocalization()
+      throws IOException {
+    // Make sure that the mapreduce.cluster.local.dir is sandboxed
     for (String childMapredLocalDir : localizedTaskConf
-        .getStrings("mapred.local.dir")) {
+        .getStrings(MRConfig.LOCAL_DIR)) {
       assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
-          childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId
-              .toString(), taskId.toString(), false)));
+          childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
+              .getUser(), jobId.toString(), taskId.toString(), false)));
     }
 
     // Make sure task task.getJobFile is changed and pointed correctly.
     assertTrue(task.getJobFile().endsWith(
-        TaskTracker
-            .getTaskConfFile(jobId.toString(), taskId.toString(), false)));
+        TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
+            .toString(), false)));
 
     // Make sure that the tmp directories are created
     assertTrue("tmp dir is not created in workDir "
-        + workDir.toUri().getPath(),
-        new File(workDir.toUri().getPath(), "tmp").exists());
+        + attemptWorkDir.toUri().getPath(), new File(attemptWorkDir.toUri()
+        .getPath(), "tmp").exists());
 
-    // Make sure that the log are setup properly
+    // Make sure that the logs are setup properly
     File logDir =
         new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
             + task.getTaskID().toString());
     assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
         logDir.exists());
-    UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
-    checkFilePermissions(logDir.getAbsolutePath(), "drwx------", ugi
-        .getUserName(), ugi.getGroupNames()[0]);
+    checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
+        .getUser(), taskTrackerUGI.getGroupNames()[0]);
 
     File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
     assertTrue("stdout log file is improper. Expected : "
-        + expectedStdout.toString() + " Observed : " + logFiles[0].toString(),
-        expectedStdout.toString().equals(logFiles[0].toString()));
+        + expectedStdout.toString() + " Observed : "
+        + attemptLogFiles[0].toString(), expectedStdout.toString().equals(
+        attemptLogFiles[0].toString()));
     File expectedStderr =
         new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
     assertTrue("stderr log file is improper. Expected : "
-        + expectedStderr.toString() + " Observed : " + logFiles[1].toString(),
-        expectedStderr.toString().equals(logFiles[1].toString()));
+        + expectedStderr.toString() + " Observed : "
+        + attemptLogFiles[1].toString(), expectedStderr.toString().equals(
+        attemptLogFiles[1].toString()));
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void testTaskCleanup()
+      throws IOException {
+
+    // Localize job and localize task.
+    tracker.getLocalizer().initializeUserDirs(task.getUser());
+    localizedJobConf = tracker.localizeJobFiles(task);
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext jobContext = new JobInitializationContext();
+    jobContext.jobid = jobId;
+    jobContext.user = localizedJobConf.getUser();
+    jobContext.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+    taskController.initializeJob(jobContext);
+    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+    tip.setJobConf(localizedJobConf);
+    tip.localizeTask(task);
+    Path workDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+            task.getUser(), task.getJobID().toString(), task.getTaskID()
+                .toString(), task.isTaskCleanupTask()), trackerFConf);
+    TaskRunner runner = task.createRunner(tracker, tip);
+    tip.setTaskRunner(runner);
+    runner.setupChildTaskConfiguration(lDirAlloc);
+    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+        localizedJobConf);
+    TaskRunner.prepareLogFiles(task.getTaskID());
+    Path localTaskFile =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+            .getUser(), task.getJobID().toString(), task.getTaskID()
+            .toString(), task.isTaskCleanupTask()), trackerFConf);
+    JobConf localizedTaskConf = new JobConf(localTaskFile);
+    TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+    TaskControllerContext taskContext =
+        new TaskController.TaskControllerContext();
+    taskContext.env =
+        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+    taskContext.task = task;
+    // /////////// The method being tested
+    taskController.initializeTask(taskContext);
+
+    // TODO: Let the task run and create files.
+
+    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+    tracker.directoryCleanupThread = cleanupQueue;
+
+    // ////////// The central methods being tested
+    tip.removeTaskFiles(true, taskId);
+    tracker.removeJobFiles(task.getUser(), jobId.toString());
+    // //////////
+
+    // TODO: make sure that all files intended to be deleted are deleted.
+
+    assertTrue("Some task files are not deleted!! Number of stale paths is "
+        + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+
+    // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still
+    // there.
+    for (String localDir : localDirs) {
+      Path userDir =
+          new Path(localDir, TaskTracker.getUserDir(task.getUser()));
+      assertTrue("User directory " + userDir + " is not present!!",
+          tracker.localFs.exists(userDir));
+    }
+
+    // Test userlogs cleanup.
+    verifyUserLogsCleanup();
+  }
+
+  /**
+   * Test userlogs cleanup.
+   * 
+   * @throws IOException
+   */
+  private void verifyUserLogsCleanup()
+      throws IOException {
+    Path logDir =
+        new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
+            + Path.SEPARATOR + task.getTaskID().toString());
+
+    // Logs should be there before cleanup.
+    assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
+        tracker.localFs.exists(logDir));
+
+    // ////////// Another being tested
+    TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
+    // modification time behind retainTimeStatmp
+    // //////////
+
+    // Logs should be gone after cleanup.
+    assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
+        tracker.localFs.exists(logDir));
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Sat Nov 28 20:26:01 2009
@@ -28,13 +28,16 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.TestProcfsBasedProcessTree;
 import org.apache.hadoop.util.ToolRunner;
 
 import junit.framework.TestCase;
@@ -57,10 +60,10 @@
 
   private void startCluster(JobConf conf)
       throws Exception {
-    conf.set("mapred.job.tracker.handler.count", "1");
-    conf.set("mapred.tasktracker.map.tasks.maximum", "1");
-    conf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
-    conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0");
+    conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
+    conf.set(TTConfig.TT_MAP_SLOTS, "1");
+    conf.set(TTConfig.TT_REDUCE_SLOTS, "1");
+    conf.set(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL, "0");
     miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf);
   }
 
@@ -170,11 +173,8 @@
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
-        2 * 1024L);
-    fConf.setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
-        2 * 1024L);
+    fConf.setLong(MRConfig.MAPMEMORY_MB, 2 * 1024L);
+    fConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024L);
     startCluster(new JobConf());
 
     JobConf conf = new JobConf(miniMRCluster.createJobConf());
@@ -199,13 +199,10 @@
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
     // very small value, so that no task escapes to successful completion.
-    fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+    fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
         String.valueOf(300));
-    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
-        2 * 1024);
-    fConf.setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
-        2 * 1024);
+    fConf.setLong(MRConfig.MAPMEMORY_MB, 2 * 1024);
+    fConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024);
     startCluster(fConf);
     runJobExceedingMemoryLimit();
   }
@@ -227,7 +224,7 @@
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
     // very small value, so that no task escapes to successful completion.
-    fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+    fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
         String.valueOf(300));
     //set old values, max vm property per task and upper limit on the tasks
     //vm
@@ -320,16 +317,16 @@
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+    fConf.setLong(MRConfig.MAPMEMORY_MB,
         1L);
     fConf.setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1L);
+        MRConfig.REDUCEMEMORY_MB, 1L);
 
     // Because of the above, the total tt limit is 2mb
     long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L;
 
     // very small value, so that no task escapes to successful completion.
-    fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+    fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
         String.valueOf(300));
 
     startCluster(fConf);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java Sat Nov 28 20:26:01 2009
@@ -18,18 +18,34 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.*;
-import java.util.*;
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.BitSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.*;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class TestTextInputFormat extends TestCase {
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestTextInputFormat {
   private static final Log LOG =
     LogFactory.getLog(TestTextInputFormat.class.getName());
 
@@ -39,17 +55,19 @@
   private static FileSystem localFs = null; 
   static {
     try {
+      defaultConf.set("fs.default.name", "file:///");
       localFs = FileSystem.getLocal(defaultConf);
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }
   }
-  private static Path workDir = 
-    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
-             "TestTextInputFormat");
-  
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+             "TestTextInputFormat").makeQualified(localFs);
+
+  @Test
   public void testFormat() throws Exception {
-    JobConf job = new JobConf();
+    JobConf job = new JobConf(defaultConf);
     Path file = new Path(workDir, "test.txt");
 
     // A reporter that does nothing
@@ -127,6 +145,100 @@
     }
   }
 
+  @Test
+  public void testSplitableCodecs() throws IOException {
+    JobConf conf = new JobConf(defaultConf);
+    int seed = new Random().nextInt();
+    // Create the codec
+    CompressionCodec codec = null;
+    try {
+      codec = (CompressionCodec)
+      ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"), conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Illegal codec!");
+    }
+    Path file = new Path(workDir, "test"+codec.getDefaultExtension());
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+    LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+    FileSystem localFs = FileSystem.getLocal(conf);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(conf, workDir);
+
+    final int MAX_LENGTH = 500000;
+
+    // for a variety of lengths
+    for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
+        length += random.nextInt(MAX_LENGTH / 4)+1) {
+
+      LOG.info("creating; entries = " + length);
+
+
+      // create a file with length entries
+      Writer writer =
+        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      TextInputFormat format = new TextInputFormat();
+      format.configure(conf);
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
+        LOG.info("splitting: requesting = " + numSplits);
+        InputSplit[] splits = format.getSplits(conf, numSplits);
+        LOG.info("splitting: got =        " + splits.length);
+
+
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.length; j++) {
+          LOG.debug("split["+j+"]= " + splits[j]);
+          RecordReader<LongWritable, Text> reader =
+            format.getRecordReader(splits[j], conf, reporter);
+          try {
+            int counter = 0;
+            while (reader.next(key, value)) {
+              int v = Integer.parseInt(value.toString());
+              LOG.debug("read " + v);
+
+              if (bits.get(v)) {
+                LOG.warn("conflict with " + v +
+                    " in split " + j +
+                    " at position "+reader.getPos());
+              }
+              assertFalse("Key in multiple partitions.", bits.get(v));
+              bits.set(v);
+              counter++;
+            }
+            if (counter > 0) {
+              LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
+            } else {
+              LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
+            }
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+
+  }
+
   private static LineReader makeStream(String str) throws IOException {
     return new LineReader(new ByteArrayInputStream
                                              (str.getBytes("UTF-8")), 
@@ -137,7 +249,8 @@
                                              (str.getBytes("UTF-8")), 
                                            bufsz);
   }
-  
+
+  @Test
   public void testUTF8() throws Exception {
     LineReader in = makeStream("abcd\u20acbdcd\u20ac");
     Text line = new Text();
@@ -156,6 +269,7 @@
    *
    * @throws Exception
    */
+  @Test
   public void testNewLines() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
@@ -195,6 +309,7 @@
    *
    * @throws Exception
    */
+  @Test
   public void testMaxLineLength() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
@@ -219,6 +334,38 @@
     }
   }
 
+  @Test
+  public void testMRMaxLine() throws Exception {
+    final int MAXPOS = 1024 * 1024;
+    final int MAXLINE = 10 * 1024;
+    final int BUF = 64 * 1024;
+    final InputStream infNull = new InputStream() {
+      int position = 0;
+      final int MAXPOSBUF = 1024 * 1024 + BUF; // max LRR pos + LineReader buf
+      @Override
+      public int read() {
+        ++position;
+        return 0;
+      }
+      @Override
+      public int read(byte[] b) {
+        assertTrue("Read too many bytes from the stream", position < MAXPOSBUF);
+        Arrays.fill(b, (byte) 0);
+        position += b.length;
+        return b.length;
+      }
+    };
+    final LongWritable key = new LongWritable();
+    final Text val = new Text();
+    LOG.info("Reading a line from /dev/null");
+    final Configuration conf = new Configuration(false);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+                LineRecordReader.MAX_LINE_LENGTH, MAXLINE);
+    conf.setInt("io.file.buffer.size", BUF); // used by LRR
+    final LineRecordReader lrr = new LineRecordReader(infNull, 0, MAXPOS, conf);
+    assertFalse("Read a line from null", lrr.next(key, val));
+  }
+
   private static void writeFile(FileSystem fs, Path name, 
                                 CompressionCodec codec,
                                 String contents) throws IOException {
@@ -244,7 +391,7 @@
     Text value = reader.createValue();
     while (reader.next(key, value)) {
       result.add(value);
-      value = (Text) reader.createValue();
+      value = reader.createValue();
     }
     reader.close();
     return result;
@@ -253,8 +400,9 @@
   /**
    * Test using the gzip codec for reading
    */
-  public static void testGzip() throws IOException {
-    JobConf job = new JobConf();
+  @Test
+  public void testGzip() throws IOException {
+    JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, job);
     localFs.delete(workDir, true);
@@ -286,8 +434,9 @@
   /**
    * Test using the gzip codec and an empty input file
    */
-  public static void testGzipEmpty() throws IOException {
-    JobConf job = new JobConf();
+  @Test
+  public void testGzipEmpty() throws IOException {
+    JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, job);
     localFs.delete(workDir, true);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -47,7 +47,7 @@
   @SuppressWarnings("unchecked")
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
-    job.set("mapred.task.id", attempt);
+    job.set(JobContext.TASK_ATTEMPT_ID, attempt);
     FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);
@@ -99,8 +99,8 @@
   public void testFormatWithCustomSeparator() throws Exception {
     JobConf job = new JobConf();
     String separator = "\u0001";
-    job.set("mapred.textoutputformat.separator", separator);
-    job.set("mapred.task.id", attempt);
+    job.set("mapreduce.output.textoutputformat.separator", separator);
+    job.set(JobContext.TASK_ATTEMPT_ID, attempt);
     FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
 public class TestTrackerBlacklistAcrossJobs extends TestCase {
   private static final String hosts[] = new String[] {
@@ -36,7 +38,7 @@
     String hostname = "";
     
     public void configure(JobConf job) {
-      this.hostname = job.get("slave.host.name");
+      this.hostname = job.get(TTConfig.TT_HOST_NAME);
     }
     
     public void map(NullWritable key, NullWritable value,
@@ -57,7 +59,7 @@
     fileSys = FileSystem.get(conf);
     // start mr cluster
     JobConf jtConf = new JobConf();
-    jtConf.setInt("mapred.max.tracker.blacklists", 1);
+    jtConf.setInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 1);
 
     mr = new MiniMRCluster(3, fileSys.getUri().toString(),
                            1, null, hosts, jtConf);
@@ -65,7 +67,7 @@
     // setup job configuration
     JobConf mrConf = mr.createJobConf();
     JobConf job = new JobConf(mrConf);
-    job.setInt("mapred.max.tracker.failures", 1);
+    job.setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 1);
     job.setNumMapTasks(6);
     job.setNumReduceTasks(0);
     job.setMapperClass(FailOnHostMapper.class);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java Sat Nov 28 20:26:01 2009
@@ -23,7 +23,9 @@
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 import junit.extensions.TestSetup;
@@ -56,8 +58,8 @@
     TestSetup setup = new TestSetup(new TestSuite(TestTrackerReservation.class)) {
       protected void setUp() throws Exception {
         JobConf conf = new JobConf();
-        conf.set("mapred.job.tracker", "localhost:0");
-        conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+        conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+        conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
         jobTracker = new FakeJobTracker(conf, new Clock(), trackers);
         for (String tracker : trackers) {
           FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
@@ -91,8 +93,7 @@
     conf.setNumReduceTasks(1);
     conf.setSpeculativeExecution(false);
     
-    conf.setBoolean(
-        "mapred.committer.job.setup.cleanup.needed", false);
+    conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
     
     //Set task tracker objects for reservation.
     TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
@@ -124,6 +125,11 @@
         2, fjob.getNumReservedTaskTrackersForMaps());
     assertEquals("Trackers not reserved for the job : reduces", 
         2, fjob.getNumReservedTaskTrackersForReduces());
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+          4, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+          4, metrics.getReservedReduceSlots());
     
     TaskAttemptID mTid = fjob.findMapTask(trackers[1]);
     TaskAttemptID rTid = fjob.findReduceTask(trackers[1]);
@@ -138,6 +144,11 @@
         0, fjob.getNumReservedTaskTrackersForMaps());
     assertEquals("Reservation for the job not released : Reduces", 
         0, fjob.getNumReservedTaskTrackersForReduces());
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        0, metrics.getReservedReduceSlots());
   }
   
   /**
@@ -165,6 +176,11 @@
         0, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Reservation for the job not released : Reduces", 
         0, job.getNumReservedTaskTrackersForReduces());
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        0, metrics.getReservedReduceSlots());
   }
   
   /**
@@ -179,9 +195,9 @@
     conf.setSpeculativeExecution(false);
     conf.setNumMapTasks(2);
     conf.setNumReduceTasks(2);
-    conf.set("mapred.max.reduce.failures.percent", ".70");
-    conf.set("mapred.max.map.failures.percent", ".70");
-    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+    conf.set(JobContext.REDUCE_FAILURES_MAXPERCENT, ".70");
+    conf.set(JobContext.MAP_FAILURES_MAX_PERCENT, ".70");
+    conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
     conf.setMaxTaskFailuresPerTracker(1);
     FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
     job.setClusterSize(trackers.length);
@@ -212,6 +228,11 @@
         2, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Trackers not reserved for the job : reduces", 
         2, job.getNumReservedTaskTrackersForReduces());
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        4, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        4, metrics.getReservedReduceSlots());
   
     /*
      * FakeJobInProgress.findMapTask does not handle
@@ -230,6 +251,12 @@
         1, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Extra Trackers reserved for the job : reduces", 
         1, job.getNumReservedTaskTrackersForReduces());
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        2, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        2, metrics.getReservedReduceSlots());
+
     //Finish the map task on the tracker 1. Finishing it here to work
     //around bug in the FakeJobInProgress object
     job.finishTask(mTid);
@@ -245,7 +272,11 @@
         0, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Trackers not unreserved for the job : reduces", 
         0, job.getNumReservedTaskTrackersForReduces());
-    
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        0, metrics.getReservedReduceSlots());
   }
 }
   
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java Sat Nov 28 20:26:01 2009
@@ -101,7 +101,8 @@
     RunningJob runningJob = JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = fs.open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));