You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2009/10/27 16:44:06 UTC

svn commit: r830230 [8/9] - in /hadoop/mapreduce/branches/HDFS-641: ./ .eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-sche...

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java Tue Oct 27 15:43:58 2009
@@ -50,11 +50,11 @@
 
   MiniMRCluster mr = null;
 
-  /** Committer with cleanup waiting on a signal
+  /** Committer with commit waiting on a signal
    */
-  static class CommitterWithDelayCleanup extends FileOutputCommitter {
+  static class CommitterWithDelayCommit extends FileOutputCommitter {
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       Configuration conf = context.getConfiguration();
       Path share = new Path(conf.get("share"));
       FileSystem fs = FileSystem.get(conf);
@@ -66,7 +66,7 @@
         }
         UtilsForTests.waitFor(100);
       }
-      super.cleanupJob(context);
+      super.commitJob(context);
     }
   }
 
@@ -103,7 +103,7 @@
     conf.setJobName("empty");
     // use an InputFormat which returns no split
     conf.setInputFormat(EmptyInputFormat.class);
-    conf.setOutputCommitter(CommitterWithDelayCleanup.class);
+    conf.setOutputCommitter(CommitterWithDelayCommit.class);
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);
     conf.setMapperClass(IdentityMapper.class);
@@ -197,7 +197,8 @@
         + " and not 1.0", runningJob.cleanupProgress() == 1.0);
 
     assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
-    FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
+    FileStatus[] list = fs.listStatus(outDir, 
+                          new Utils.OutputFileUtils.OutputFilesFilter());
     assertTrue("Number of part-files is " + list.length + " and not "
         + numReduces, list.length == numReduces);
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java Tue Oct 27 15:43:58 2009
@@ -75,7 +75,7 @@
       theRecordWriter.close(reporter);
     }
     committer.commitTask(tContext);
-    committer.cleanupJob(jContext);
+    committer.commitJob(jContext);
     
     File expectedFile = new File(new Path(outDir, file).toString());
     StringBuffer expectedOutput = new StringBuffer();

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java Tue Oct 27 15:43:58 2009
@@ -116,7 +116,8 @@
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = fs.open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -161,7 +162,8 @@
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
   }
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Tue Oct 27 15:43:58 2009
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -33,8 +34,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapreduce.Cluster;
@@ -46,6 +50,7 @@
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -615,6 +620,16 @@
       mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
           3, null, null, conf);
 
+      assertEquals("Files in logDir did not move to DONE folder",
+          0, logDirFs.listStatus(logDirPath).length);
+
+      JobHistory jobHistory = 
+        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+      Path doneDir = jobHistory.getCompletedJobHistoryLocation();
+
+      assertEquals("Files in DONE dir not correct",
+          2, doneDir.getFileSystem(conf).listStatus(doneDir).length);
+
       // run the TCs
       conf = mr.createJobConf();
 
@@ -635,9 +650,6 @@
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
       
-      JobHistory jobHistory = 
-        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
-      Path doneDir = jobHistory.getCompletedJobHistoryLocation();
       assertEquals("History DONE folder not correct", 
           doneFolder, doneDir.getName());
       JobID id = job.getID();
@@ -900,4 +912,65 @@
     }
   }
 
+  public void testHistoryInitWithCorruptFiles() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      JobConf conf = new JobConf();
+      Path historyDir = new Path(System.getProperty("test.build.data", "."),
+      "history");
+      conf.set(JTConfig.JT_JOBHISTORY_LOCATION,
+          historyDir.toString());
+      conf.setUser("user");
+
+      FileSystem localFs = FileSystem.getLocal(conf);
+      
+      //there may be some stale files, clean them
+      if (localFs.exists(historyDir)) {
+        boolean deleted = localFs.delete(historyDir, true);
+        LOG.info(historyDir + " deleted " + deleted);
+      }
+
+      // Start the cluster, create a history file
+      mr = new MiniMRCluster(0, "file:///", 3, null, null, conf);
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      JobHistory jh = jt.getJobHistory();
+      final JobID jobId = JobID.forName("job_200809171136_0001");
+      jh.setupEventWriter(jobId, conf);
+      JobSubmittedEvent jse =
+        new JobSubmittedEvent(jobId, "job", "user", 12345, "path");
+      jh.logEvent(jse, jobId);
+      jh.closeWriter(jobId);
+
+      // Corrupt the history file. User RawLocalFileSystem so that we
+      // do keep the original CRC file intact.
+      String historyFileName = jobId.toString() + "_" + "user";
+      Path historyFilePath = new Path (historyDir.toString(), historyFileName);
+
+      RawLocalFileSystem fs = (RawLocalFileSystem)
+        FileSystem.getLocal(conf).getRaw();
+
+      FSDataOutputStream out = fs.create(historyFilePath, true);
+      byte[] corruptData = new byte[32];
+      new Random().nextBytes(corruptData);
+      out.write (corruptData, 0, 32);
+      out.close();
+
+      // Stop and start the tracker. The tracker should come up nicely
+      mr.stopJobTracker();
+      mr.startJobTracker();
+      jt = mr.getJobTrackerRunner().getJobTracker();
+      assertNotNull("JobTracker did not come up", jt );
+      jh = jt.getJobHistory();
+      assertNotNull("JobHistory did not get initialized correctly", jh);
+
+      // Only the done folder should remain in the history directory
+      assertEquals("Files in logDir did not move to DONE folder",
+          1, historyDir.getFileSystem(conf).listStatus(historyDir).length);
+    } finally {
+      if (mr != null) {
+        cleanupLocalFiles(mr);
+        mr.shutdown();
+      }
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java Tue Oct 27 15:43:58 2009
@@ -37,25 +37,93 @@
   public void testJobFailAndKill() throws IOException {
     MiniMRCluster mr = null;
     try {
-      mr = new MiniMRCluster(2, "file:///", 3);
+      JobConf jtConf = new JobConf();
+      jtConf.set("mapred.jobtracker.instrumentation", 
+          JTInstrumentation.class.getName());
+      mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
+      JTInstrumentation instr = (JTInstrumentation) 
+        mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
 
       // run the TCs
       JobConf conf = mr.createJobConf();
-
+      
       Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
       Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
       RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
       // Checking that the Job got failed
       assertEquals(job.getJobState(), JobStatus.FAILED);
+      assertTrue(instr.verifyJob());
+      assertEquals(1, instr.failed);
+      instr.reset();
 
+      
       job = UtilsForTests.runJobKill(conf, inDir, outDir);
       // Checking that the Job got killed
       assertTrue(job.isComplete());
       assertEquals(job.getJobState(), JobStatus.KILLED);
+      assertTrue(instr.verifyJob());
+      assertEquals(1, instr.killed);
     } finally {
       if (mr != null) {
         mr.shutdown();
       }
     }
   }
+  
+  static class JTInstrumentation extends JobTrackerInstrumentation {
+    volatile int failed;
+    volatile int killed;
+    volatile int addPrep;
+    volatile int decPrep;
+    volatile int addRunning;
+    volatile int decRunning;
+
+    void reset() {
+      failed = 0;
+      killed = 0;
+      addPrep = 0;
+      decPrep = 0;
+      addRunning = 0;
+      decRunning = 0;
+    }
+
+    boolean verifyJob() {
+      return addPrep==1 && decPrep==1 && addRunning==1 && decRunning==1;
+    }
+
+    public JTInstrumentation(JobTracker jt, JobConf conf) {
+      super(jt, conf);
+    }
+
+    public synchronized void addPrepJob(JobConf conf, JobID id) 
+    {
+      addPrep++;
+    }
+    
+    public synchronized void decPrepJob(JobConf conf, JobID id) 
+    {
+      decPrep++;
+    }
+
+    public synchronized void addRunningJob(JobConf conf, JobID id) 
+    {
+      addRunning++;
+    }
+
+    public synchronized void decRunningJob(JobConf conf, JobID id) 
+    {
+      decRunning++;
+    }
+    
+    public synchronized void failedJob(JobConf conf, JobID id) 
+    {
+      failed++;
+    }
+
+    public synchronized void killedJob(JobConf conf, JobID id) 
+    {
+      killed++;
+    }
+  }
+  
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java Tue Oct 27 15:43:58 2009
@@ -61,7 +61,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -95,7 +95,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Oct 27 15:43:58 2009
@@ -46,8 +46,9 @@
     private FakeTaskTrackerManager taskTrackerManager;
     
     public FakeJobInProgress(JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf, null);
+        FakeTaskTrackerManager taskTrackerManager, JobTracker jt) 
+          throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP, 
@@ -281,7 +282,8 @@
                          int numJobs, int state)
     throws IOException {
     for (int i = 0; i < numJobs; i++) {
-      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager, 
+      UtilsForTests.getJobTracker());
       job.getStatus().setRunState(state);
       taskTrackerManager.submitJob(job);
     }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Tue Oct 27 15:43:58 2009
@@ -189,7 +189,7 @@
     Text value = reader.createValue();
     while (reader.next(key, value)) {
       result.add(value);
-      value = (Text) reader.createValue();
+      value = reader.createValue();
     }
     return result;
   }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java Tue Oct 27 15:43:58 2009
@@ -157,7 +157,7 @@
 
       Path[] fileList = 
         FileUtil.stat2Paths(fileSys.listStatus(output1,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test1 File list[" + i + "]" + ": "+ fileList[i]);
       }
@@ -169,7 +169,7 @@
 
       fileList =
         FileUtil.stat2Paths(fileSys.listStatus(output2,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test2 File list[" + i + "]" + ": "+ fileList[i]);
       }
@@ -182,7 +182,7 @@
 
       fileList =
         FileUtil.stat2Paths(fileSys.listStatus(output3,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test3 File list[" + i + "]" + ": "+ fileList[i]);
       }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java Tue Oct 27 15:43:58 2009
@@ -21,8 +21,14 @@
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.RandomWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 
 public class TestMapredHeartbeat extends TestCase {
   public void testJobDirCleanup() throws IOException {
@@ -69,6 +75,37 @@
       if (mr != null) { mr.shutdown(); }
     }
   }
+  
+  public void testOutOfBandHeartbeats() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      
+      int taskTrackers = 1;
+      JobConf jobConf = new JobConf();
+      jobConf.setFloat(JTConfig.JT_HEARTBEATS_SCALING_FACTOR, 30.0f);
+      jobConf.setBoolean(TTConfig.TT_OUTOFBAND_HEARBEAT, true);
+      mr = new MiniMRCluster(taskTrackers, 
+                             dfs.getFileSystem().getUri().toString(), 3, 
+                             null, null, jobConf);
+      long start = System.currentTimeMillis();
+      TestMiniMRDFSSort.runRandomWriter(mr.createJobConf(), new Path("rw"));
+      long end = System.currentTimeMillis();
+      
+      final int expectedRuntimeSecs = 120;
+      final int runTimeSecs = (int)((end-start) / 1000); 
+      System.err.println("Runtime is " + runTimeSecs);
+      assertEquals("Actual runtime " + runTimeSecs + "s not less than expected " +
+      		         "runtime of " + expectedRuntimeSecs + "s!", 
+                   true, (runTimeSecs <= 120));
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+      if (dfs != null) { dfs.shutdown(); }
+    }
+  }
+  
 }
 
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java Tue Oct 27 15:43:58 2009
@@ -80,7 +80,7 @@
     {
       Path[] parents = FileUtil.stat2Paths(fs.listStatus(outDir.getParent()));
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-              new OutputLogFilter()));
+              new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         BufferedReader file = 
           new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
@@ -135,7 +135,8 @@
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                 new OutputLogFilter()));
+                                 new Utils.OutputFileUtils
+                                          .OutputFilesFilter()));
     for (int i = 0; i < fileList.length; ++i) {
       BufferedReader file = new BufferedReader(new InputStreamReader(
                                                                      fs.open(fileList[i])));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Tue Oct 27 15:43:58 2009
@@ -71,7 +71,7 @@
     return setup;
   }
 
-  private static void runRandomWriter(JobConf job, Path sortInput) 
+  public static void runRandomWriter(JobConf job, Path sortInput) 
   throws Exception {
     // Scale down the default settings for RandomWriter for the test-case
     // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Oct 27 15:43:58 2009
@@ -107,7 +107,8 @@
     {
       
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                   new OutputLogFilter()));
+                                   new Utils.OutputFileUtils
+                                            .OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
         BufferedReader file = 

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java Tue Oct 27 15:43:58 2009
@@ -44,8 +44,9 @@
   class FakeJobInProgress extends JobInProgress {
    
     public FakeJobInProgress(JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf, null);
+        FakeTaskTrackerManager taskTrackerManager, 
+        JobTracker jt) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP, 
           jobConf.getUser(), 
@@ -233,7 +234,8 @@
     // will be inited first and that will hang
     
     for (int i = 0; i < NUM_JOBS; i++) {
-      jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
+      jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager, 
+      UtilsForTests.getJobTracker());
       jobs[i].getStatus().setRunState(JobStatus.PREP);
       taskTrackerManager.submitJob(jobs[i]);
     }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java Tue Oct 27 15:43:58 2009
@@ -34,7 +34,8 @@
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc, null);
+    JobInProgress jip = new JobInProgress(jid, jc, 
+      UtilsForTests.getJobTracker());
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
@@ -66,7 +67,8 @@
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc, null) {
+    JobInProgress jip = new JobInProgress(jid, jc, 
+      UtilsForTests.getJobTracker()) {
       long getInputLength() {
         return singleMapInputSize*desiredMaps();
       }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Tue Oct 27 15:43:58 2009
@@ -49,16 +49,16 @@
     }
   }
 
-  // Commiter with cleanupJob throwing exception
-  static class CommitterWithFailCleanup extends FileOutputCommitter {
+  // Commiter with commitJob throwing exception
+  static class CommitterWithFailCommit extends FileOutputCommitter {
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       throw new IOException();
     }
   }
 
   // Committer waits for a file to be created on dfs.
-  static class CommitterWithLongSetupAndCleanup extends FileOutputCommitter {
+  static class CommitterWithLongSetupAndCommit extends FileOutputCommitter {
     
     private void waitForSignalFile(FileSystem fs, Path signalFile) 
     throws IOException {
@@ -78,9 +78,9 @@
     }
     
     @Override
-    public void cleanupJob(JobContext context) throws IOException {
+    public void commitJob(JobContext context) throws IOException {
       waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile);
-      super.cleanupJob(context);
+      super.commitJob(context);
     }
   }
 
@@ -123,7 +123,7 @@
   throws IOException {
     // launch job with waiting setup/cleanup
     JobConf jobConf = mr.createJobConf();
-    jobConf.setOutputCommitter(CommitterWithLongSetupAndCleanup.class);
+    jobConf.setOutputCommitter(CommitterWithLongSetupAndCommit.class);
     RunningJob job = UtilsForTests.runJob(jobConf, inDir, outDir);
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
@@ -238,7 +238,7 @@
                              null, null, jtConf);
       // test setup/cleanup throwing exceptions
       testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf());
-      testFailCommitter(CommitterWithFailCleanup.class, mr.createJobConf());
+      testFailCommitter(CommitterWithFailCommit.class, mr.createJobConf());
       // test the command-line kill for setup/cleanup attempts. 
       testSetupAndCleanupKill(mr, dfs, true);
       // remove setup/cleanup signal files.

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java Tue Oct 27 15:43:58 2009
@@ -22,9 +22,11 @@
 import java.util.List;
 
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeTaskInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -92,6 +94,51 @@
     }
   }
 
+  static class FakeJobWithTaskCleanupTask 
+  extends FakeObjectUtilities.FakeJobInProgress {
+
+    FakeJobWithTaskCleanupTask(JobConf jobConf, 
+        JobTracker tracker) throws IOException {
+      super(jobConf, tracker);
+    }
+
+    /**
+     * Initialize tasks(1 map and 1 reduce task each needs 2 slots, similar to
+     * tasks of a high RAM job). 
+     */
+    @Override
+    public synchronized void initTasks() throws IOException {
+      super.initTasks();
+      Job.RawSplit emptySplit = new Job.RawSplit();
+      final int numSlotsPerTask = 2;
+      maps = new TaskInProgress[1];
+      reduces = new TaskInProgress[1];
+      
+      maps[0] = new FakeTaskInProgress(getJobID(), "test",  emptySplit,
+          jobtracker, getJobConf(), this, 0, numSlotsPerTask);
+      TaskAttemptID attemptId = new TaskAttemptID(maps[0].getTIPId(), 0);
+      
+      // make this task a taskCleanup task of a map task
+      mapCleanupTasks.add(attemptId);
+      TaskStatus stat = new MapTaskStatus(attemptId, 0.01f, 2,
+          TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0],
+          TaskStatus.Phase.MAP, new Counters());
+      maps[0].updateStatus(stat);
+      
+      //similarly for reduce task's taskCleanup task
+      reduces[0] = new FakeTaskInProgress(getJobID(), "test", 1,
+          0, jobtracker, getJobConf(), this, numSlotsPerTask);
+      attemptId = new TaskAttemptID(reduces[0].getTIPId(), 0);
+      
+      // make this task a taskCleanup task of a reduce task
+      reduceCleanupTasks.add(attemptId);
+      stat = new ReduceTaskStatus(attemptId, 0.01f, 2,
+          TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0],
+          TaskStatus.Phase.REDUCE, new Counters());
+      reduces[0].updateStatus(stat);
+    }
+  }
+
   public void setUp() throws Exception {
     JobConf conf = new JobConf();
     conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
@@ -103,7 +150,7 @@
   }
 
   // create a job for testing setup tasks and reservations
-  FakeJobInProgress createJob(boolean withSetup) throws IOException {
+  FakeJobInProgress createJob(TaskType taskType) throws IOException {
     JobConf conf = new JobConf();
     conf.setSpeculativeExecution(false);
     conf.setNumMapTasks(2);
@@ -111,28 +158,39 @@
     conf.set(JobContext.REDUCE_FAILURES_MAXPERCENT, ".70");
     conf.set(JobContext.MAP_FAILURES_MAX_PERCENT, ".70");
     FakeJobInProgress job = null;
-    if (withSetup) {
-      job = new FakeJobWithSetupTask(conf, jobTracker);
-    } else {
+    if (taskType == null) {
       conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
       job = new FakeJobInProgress(conf, jobTracker);
+    } else if (taskType == TaskType.JOB_SETUP) {
+      job = new FakeJobWithSetupTask(conf, jobTracker);
+    } else if (taskType == TaskType.TASK_CLEANUP) {
+      job = new FakeJobWithTaskCleanupTask(conf, jobTracker);
     }
     job.setClusterSize(trackers.length);
     job.initTasks();
     return job;
   }
   
-  // create a new TaskStatus and add to a list of status objects
-  void addNewTaskStatus(FakeJobInProgress job,
-      boolean isMapTask, String tracker, List<TaskStatus> reports) 
+  // create a new TaskStatus and add to a list of status objects.
+  // useMapSlot param is needed only when taskType is TASK_CLEANUP.
+  void addNewTaskStatus(FakeJobInProgress job, TaskType taskType,
+        boolean useMapSlot, String tracker, List<TaskStatus> reports) 
         throws IOException {
     TaskAttemptID task = null;
     TaskStatus status = null;
-    if (isMapTask) {
+    if (taskType == TaskType.MAP) {
       task = job.findMapTask(tracker);
       status = new MapTaskStatus(task, 0.01f, 2,
             TaskStatus.State.RUNNING, "", "", tracker,
             TaskStatus.Phase.MAP, new Counters());
+    } else if (taskType == TaskType.TASK_CLEANUP) {
+      if (useMapSlot) {
+        status = job.maps[0].taskStatuses.get(
+          new TaskAttemptID(job.maps[0].getTIPId(), 0));
+      } else {
+        status = job.reduces[0].taskStatuses.get(
+              new TaskAttemptID(job.reduces[0].getTIPId(), 0));
+      }
     } else {
       task = job.findReduceTask(tracker);
       status = new ReduceTaskStatus(task, 0.01f, 2,
@@ -159,7 +217,7 @@
    */
   public void testSetupTaskReturnedForFreeMapSlots() throws IOException {
     // create a job with a setup task.
-    FakeJobInProgress job = createJob(true);
+    FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
     jobTracker.jobs.put(job.getJobID(), job);
     
     // create a status simulating a free tasktracker
@@ -182,16 +240,16 @@
    */
   public void testMapSlotsCountedForSetup() throws IOException {
     // create a job with a setup task.
-    FakeJobInProgress job = createJob(true);
+    FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
     jobTracker.jobs.put(job.getJobID(), job);
     
     // create another job for reservation
-    FakeJobInProgress job1 = createJob(false);
+    FakeJobInProgress job1 = createJob(null);
     jobTracker.jobs.put(job1.getJobID(), job1);
    
     // create TT status for testing getSetupAndCleanupTasks
     List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
-    addNewTaskStatus(job, true, trackers[0], taskStatuses);
+    addNewTaskStatus(job, TaskType.MAP, true, trackers[0], taskStatuses);
     TaskTrackerStatus ttStatus 
       = createTaskTrackerStatus(trackers[0], taskStatuses);
     
@@ -210,19 +268,19 @@
    */
   public void testReduceSlotsCountedForSetup() throws IOException {
     // create a job with a setup task.
-    FakeJobInProgress job = createJob(true);
+    FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
     jobTracker.jobs.put(job.getJobID(), job);
     
     // create another job for reservation
-    FakeJobInProgress job1 = createJob(false);
+    FakeJobInProgress job1 = createJob(null);
     jobTracker.jobs.put(job1.getJobID(), job1);
     
     // create TT status for testing getSetupAndCleanupTasks
     List<TaskStatus> reports = new ArrayList<TaskStatus>();
     // because free map slots are checked first in code,
     // we fill up map slots also.
-    addNewTaskStatus(job1, true, trackers[1], reports);
-    addNewTaskStatus(job1, false, trackers[1], reports);
+    addNewTaskStatus(job1, TaskType.MAP, true, trackers[1], reports);
+    addNewTaskStatus(job1, TaskType.REDUCE, false,trackers[1], reports);
     TaskTrackerStatus ttStatus 
       = createTaskTrackerStatus(trackers[1], reports);
 
@@ -232,4 +290,48 @@
     assertNull(tasks);
     jobTracker.jobs.clear();
   }
+
+  void validateNumSlotsUsedForTaskCleanup(TaskTrackerStatus ttStatus)
+       throws IOException {
+    List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
+
+    assertEquals("Actual number of taskCleanup tasks is not same as expected", 1, tasks.size());
+    LOG.info("taskCleanup task is " + tasks.get(0));
+    assertTrue(tasks.get(0).isTaskCleanupTask());
+
+    // slots needed for taskCleanup task should be 1(even for high RAM jobs)
+    assertEquals("TaskCleanup task should not need more than 1 slot.",
+                 1, tasks.get(0).getNumSlotsRequired());
+  }
+  
+  /**
+   * Test to check that map slots are counted when returning
+   * a taskCleanup task.
+   * @throws IOException
+   */
+  public void testNumSlotsUsedForTaskCleanup() throws IOException {
+    // Create a high RAM job with a map task's cleanup task and a reduce task's
+    // cleanup task. Make this Fake job a high RAM job by setting the slots
+    // required for map/reduce task to 2.
+    FakeJobInProgress job = createJob(TaskType.TASK_CLEANUP);
+    jobTracker.jobs.put(job.getJobID(), job);
+   
+    // create TT status for testing getSetupAndCleanupTasks
+    List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
+    TaskTrackerStatus ttStatus =
+      createTaskTrackerStatus(trackers[0], taskStatuses);//create dummy status
+    addNewTaskStatus(job, TaskType.TASK_CLEANUP, true, trackers[0],
+                     taskStatuses);// status of map task's cleanup task
+    addNewTaskStatus(job, TaskType.TASK_CLEANUP, false, trackers[0],
+                     taskStatuses);// status of reduce task's cleanup task
+    ttStatus = createTaskTrackerStatus(trackers[0], taskStatuses);
+    
+    // validate mapTaskCleanup task
+    validateNumSlotsUsedForTaskCleanup(ttStatus);
+    
+    // validate reduceTaskCleanup task
+    validateNumSlotsUsedForTaskCleanup(ttStatus);
+    
+    jobTracker.jobs.clear();
+  }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Tue Oct 27 15:43:58 2009
@@ -152,7 +152,7 @@
 
   private void verifyOutput(FileSystem fs, Path outDir) throws IOException {
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(outDir, new OutputLogFilter()));
+        fs.listStatus(outDir, new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(numReduces, outputFiles.length);
     InputStream is = fs.open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java Tue Oct 27 15:43:58 2009
@@ -174,7 +174,7 @@
         healthStatus.setLastReported(status.getLastReported());
       }
       jobTracker.heartbeat(tts, false, initialContact, 
-                           false, (short) responseId);
+                           false, responseId);
     }
     responseId++;
   }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java Tue Oct 27 15:43:58 2009
@@ -391,7 +391,7 @@
     Text value = reader.createValue();
     while (reader.next(key, value)) {
       result.add(value);
-      value = (Text) reader.createValue();
+      value = reader.createValue();
     }
     reader.close();
     return result;

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Tue Oct 27 15:43:58 2009
@@ -183,4 +183,12 @@
     }
     super.testFileSystemOtherThanDefault();
   }
+  
+  @Override
+  public void testFreshness()  throws Exception { 
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+    super.testFreshness();
+  }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java Tue Oct 27 15:43:58 2009
@@ -23,6 +23,7 @@
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -124,6 +125,11 @@
         2, fjob.getNumReservedTaskTrackersForMaps());
     assertEquals("Trackers not reserved for the job : reduces", 
         2, fjob.getNumReservedTaskTrackersForReduces());
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+          4, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+          4, metrics.getReservedReduceSlots());
     
     TaskAttemptID mTid = fjob.findMapTask(trackers[1]);
     TaskAttemptID rTid = fjob.findReduceTask(trackers[1]);
@@ -138,6 +144,11 @@
         0, fjob.getNumReservedTaskTrackersForMaps());
     assertEquals("Reservation for the job not released : Reduces", 
         0, fjob.getNumReservedTaskTrackersForReduces());
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        0, metrics.getReservedReduceSlots());
   }
   
   /**
@@ -165,6 +176,11 @@
         0, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Reservation for the job not released : Reduces", 
         0, job.getNumReservedTaskTrackersForReduces());
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        0, metrics.getReservedReduceSlots());
   }
   
   /**
@@ -212,6 +228,11 @@
         2, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Trackers not reserved for the job : reduces", 
         2, job.getNumReservedTaskTrackersForReduces());
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        4, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        4, metrics.getReservedReduceSlots());
   
     /*
      * FakeJobInProgress.findMapTask does not handle
@@ -230,6 +251,12 @@
         1, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Extra Trackers reserved for the job : reduces", 
         1, job.getNumReservedTaskTrackersForReduces());
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        2, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        2, metrics.getReservedReduceSlots());
+
     //Finish the map task on the tracker 1. Finishing it here to work
     //around bug in the FakeJobInProgress object
     job.finishTask(mTid);
@@ -245,7 +272,11 @@
         0, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Trackers not unreserved for the job : reduces", 
         0, job.getNumReservedTaskTrackersForReduces());
-    
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        0, metrics.getReservedReduceSlots());
   }
 }
   
\ No newline at end of file

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java Tue Oct 27 15:43:58 2009
@@ -101,7 +101,8 @@
     RunningJob runningJob = JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = fs.open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Oct 27 15:43:58 2009
@@ -47,6 +47,7 @@
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 import org.apache.commons.logging.Log;
 
@@ -701,4 +702,16 @@
     fos.close();
   }
 
+  static JobTracker getJobTracker() {
+    JobConf conf = new JobConf();
+    conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+    conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
+    JobTracker jt;
+    try {
+      jt = new JobTracker(conf);
+      return jt;
+    } catch (Exception e) {
+      throw new RuntimeException("Could not start jt", e);
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java Tue Oct 27 15:43:58 2009
@@ -145,7 +145,7 @@
         throws IOException {
       Text dumbKey = new Text("");
       while (values.hasNext()) {
-        Text data = (Text) values.next();
+        Text data = values.next();
         output.collect(dumbKey, data);
       }
     }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java Tue Oct 27 15:43:58 2009
@@ -50,6 +50,7 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -317,7 +318,8 @@
     job.setOutputFormat(SequenceFileOutputFormat.class);
     JobClient.runJob(job);
 
-    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
+    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, 
+                             new Utils.OutputFileUtils.OutputFilesFilter());
     assertEquals(1, outlist.length);
     assertTrue(0 < outlist[0].getLen());
     SequenceFile.Reader r =

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Tue Oct 27 15:43:58 2009
@@ -31,10 +31,10 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.Utils;
 
 
 public class TestKeyFieldBasedComparator extends HadoopTestCase {
@@ -95,7 +95,7 @@
     }
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(outDir,
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java Tue Oct 27 15:43:58 2009
@@ -37,9 +37,9 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -197,7 +197,8 @@
 
     List<String> results = new ArrayList<String>();
     for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
-    		                        new OutputLogFilter()))) {
+    		                        new Utils.OutputFileUtils
+    		                                 .OutputFilesFilter()))) {
       results.add(TestMiniMRWithDFS.readOutput(p, job));
     }
     assertEquals("number of reduces is wrong", 

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Tue Oct 27 15:43:58 2009
@@ -43,7 +43,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -214,7 +214,7 @@
     throws IOException, InterruptedException {
       Text dumbKey = new Text("");
       while (values.hasNext()) {
-        Text data = (Text) values.next();
+        Text data = values.next();
         context.write(dumbKey, data);
       }
     }
@@ -396,7 +396,7 @@
     StringBuffer result = new StringBuffer();
 
     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-           new OutputLogFilter()));
+           new Utils.OutputFileUtils.OutputFilesFilter()));
     for (Path outputFile : fileList) {
       LOG.info("Path" + ": "+ outputFile);
       BufferedReader file = 

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java Tue Oct 27 15:43:58 2009
@@ -36,7 +36,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -147,7 +147,7 @@
 
       Path[] fileList = 
         FileUtil.stat2Paths(fileSys.listStatus(output1,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test1 File list[" + i + "]" + ": "+ fileList[i]);
       }
@@ -159,7 +159,7 @@
 
       fileList =
         FileUtil.stat2Paths(fileSys.listStatus(output2,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test2 File list[" + i + "]" + ": "+ fileList[i]);
       }
@@ -172,7 +172,7 @@
 
       fileList =
         FileUtil.stat2Paths(fileSys.listStatus(output3,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test3 File list[" + i + "]" + ": "+ fileList[i]);
       }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/TestValueIterReset.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/TestValueIterReset.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/TestValueIterReset.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/TestValueIterReset.java Tue Oct 27 15:43:58 2009
@@ -38,7 +38,7 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -551,7 +551,7 @@
   private void validateOutput() throws IOException {
     Path[] outputFiles = FileUtil.stat2Paths(
         localFs.listStatus(new Path(TEST_ROOT_DIR + "/out"),
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = localFs.open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Tue Oct 27 15:43:58 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -53,7 +54,7 @@
           .getAbsolutePath();
 
   protected File ROOT_MAPRED_LOCAL_DIR;
-  private static String TEST_CACHE_BASE_DIR;
+  private static String TEST_CACHE_BASE_DIR = "cachebasedir";
   protected int numLocalDirs = 6;
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
@@ -61,6 +62,7 @@
   protected Configuration conf;
   protected Path firstCacheFile;
   protected Path secondCacheFile;
+  private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
     new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
@@ -77,18 +79,12 @@
     // Prepare the tests' mapred-local-dir
     ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
     ROOT_MAPRED_LOCAL_DIR.mkdirs();
-    String []localDirs = new String[numLocalDirs];
-    for (int i = 0; i < numLocalDirs; i++) {
-      localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
-    }
-
-    TEST_CACHE_BASE_DIR =
-        new File(TEST_ROOT_DIR, "cachebasedir").getAbsolutePath();
 
     conf = new Configuration();
     conf.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
-    conf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localDirs);
+    conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, ROOT_MAPRED_LOCAL_DIR.toString());
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    fs = FileSystem.get(conf);
 
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
@@ -187,20 +183,21 @@
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf);
     FileSystem localfs = FileSystem.getLocal(conf);
+    long now = System.currentTimeMillis();
 
     manager.getLocalCache(firstCacheFile.toUri(), conf, 
-        new Path(TEST_CACHE_BASE_DIR), null, false, 
-        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
-    manager.releaseCache(firstCacheFile.toUri(), conf);
+        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false);
+    manager.releaseCache(firstCacheFile.toUri(), conf, now);
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
     //sweep away the first cache.
     manager.getLocalCache(secondCacheFile.toUri(), conf, 
-        new Path(TEST_CACHE_BASE_DIR), null, false, 
+        TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false, 
         System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
     FileStatus[] dirStatuses = localfs.listStatus(
-        new Path(TEST_CACHE_BASE_DIR));
+      new Path(ROOT_MAPRED_LOCAL_DIR.toString()));
     assertTrue("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
         dirStatuses.length == 1);
@@ -213,7 +210,8 @@
     Path fileToCache = new Path("fakefile:///"
         + firstCacheFile.toUri().getPath());
     Path result = manager.getLocalCache(fileToCache.toUri(), conf,
-        new Path(TEST_CACHE_BASE_DIR), null, false, System.currentTimeMillis(),
+        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+        System.currentTimeMillis(),
         new Path(TEST_ROOT_DIR), false);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
@@ -245,4 +243,98 @@
   protected File pathToFile(Path p) {
     return new File(p.toString());
   }
+  
+  public static class FakeFileSystem extends RawLocalFileSystem {
+    private long increment = 0;
+    public FakeFileSystem() {
+      super();
+    }
+    
+    public FileStatus getFileStatus(Path p) throws IOException {
+      File f = pathToFile(p);
+      return new FileStatus(f.length(), f.isDirectory(), 1, 128,
+      f.lastModified() + increment, makeQualified(new Path(f.getPath())));
+    }
+    
+    void advanceClock(long millis) {
+      increment += millis;
+    }
+  }
+  
+  public void testFreshness() throws Exception {
+    Configuration myConf = new Configuration(conf);
+    myConf.set("fs.default.name", "refresh:///");
+    myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
+    TrackerDistributedCacheManager manager = 
+      new TrackerDistributedCacheManager(myConf);
+    // ****** Imitate JobClient code
+    // Configures a task/job with both a regular file and a "classpath" file.
+    Configuration subConf = new Configuration(myConf);
+    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
+    TrackerDistributedCacheManager.determineTimestamps(subConf);
+    // ****** End of imitating JobClient code
+
+    String userName = getJobOwnerName();
+
+    // ****** Imitate TaskRunner code.
+    TaskDistributedCacheManager handle =
+      manager.newTaskDistributedCacheManager(subConf);
+    assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
+    File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
+    handle.setup(localDirAllocator, workDir, TaskTracker
+        .getDistributedCacheDir(userName));
+    // ****** End of imitating TaskRunner code
+
+    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
+    assertNotNull(null, localCacheFiles);
+    assertEquals(1, localCacheFiles.length);
+    Path cachedFirstFile = localCacheFiles[0];
+    assertFileLengthEquals(firstCacheFile, cachedFirstFile);
+    assertFalse("Paths should be different.", 
+        firstCacheFile.equals(cachedFirstFile));
+    // release
+    handle.release();
+    
+    // change the file timestamp
+    FileSystem fs = FileSystem.get(myConf);
+    ((FakeFileSystem)fs).advanceClock(1);
+
+    // running a task of the same job
+    Throwable th = null;
+    try {
+      handle.setup(localDirAllocator, workDir, TaskTracker
+          .getDistributedCacheDir(userName));
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull("Throwable is null", th);
+    assertTrue("Exception message does not match",
+        th.getMessage().contains("has changed on HDFS since job started"));
+    // release
+    handle.release();
+    
+    // submit another job
+    Configuration subConf2 = new Configuration(myConf);
+    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
+    TrackerDistributedCacheManager.determineTimestamps(subConf2);
+    
+    handle =
+      manager.newTaskDistributedCacheManager(subConf2);
+    handle.setup(localDirAllocator, workDir, TaskTracker
+        .getDistributedCacheDir(userName));
+    Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
+    assertNotNull(null, localCacheFiles2);
+    assertEquals(1, localCacheFiles2.length);
+    Path cachedFirstFile2 = localCacheFiles2[0];
+    assertFileLengthEquals(firstCacheFile, cachedFirstFile2);
+    assertFalse("Paths should be different.", 
+        firstCacheFile.equals(cachedFirstFile2));
+    
+    // assert that two localizations point to different paths
+    assertFalse("two jobs with different timestamps did not localize" +
+        " in different paths", cachedFirstFile.equals(cachedFirstFile2));
+    // release
+    handle.release();
+  }
+
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java Tue Oct 27 15:43:58 2009
@@ -21,7 +21,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -127,7 +127,7 @@
     FileSystem fs = outDir.getFileSystem(conf);
     StringBuffer result = new StringBuffer();
     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                        new OutputLogFilter()));
+                        new Utils.OutputFileUtils.OutputFilesFilter()));
     for(int i=0; i < fileList.length; ++i) {
       BufferedReader file = 
         new BufferedReader(new InputStreamReader(fs.open(fileList[i])));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java Tue Oct 27 15:43:58 2009
@@ -32,6 +32,7 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -282,7 +283,8 @@
   private static void checkOuterConsistency(Job job, Path[] src) 
       throws IOException {
     Path outf = FileOutputFormat.getOutputPath(job);
-    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
+    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, new 
+                             Utils.OutputFileUtils.OutputFilesFilter());
     assertEquals("number of part files is more than 1. It is" + outlist.length,
       1, outlist.length);
     assertTrue("output file with zero length" + outlist[0].getLen(),
@@ -388,7 +390,8 @@
     job.waitForCompletion(true);
     assertTrue("Job failed", job.isSuccessful());
 
-    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
+    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, 
+                             new Utils.OutputFileUtils.OutputFilesFilter());
     assertEquals(1, outlist.length);
     assertTrue(0 < outlist[0].getLen());
     SequenceFile.Reader r =

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java Tue Oct 27 15:43:58 2009
@@ -101,7 +101,7 @@
       writer.close(context);
     }
     committer.commitTask(context);
-    committer.cleanupJob(job);
+    committer.commitJob(job);
 
     InputFormat<IntWritable, DoubleWritable> iformat =
       new SequenceFileInputFormat<IntWritable, DoubleWritable>();

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java Tue Oct 27 15:43:58 2009
@@ -26,7 +26,7 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.HadoopTestCase;
-import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
@@ -70,7 +70,7 @@
 
     // validate output
     Path[] outputFiles = FileUtil.stat2Paths(getFileSystem().listStatus(outDir,
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Tue Oct 27 15:43:58 2009
@@ -33,23 +33,36 @@
 public class TestRumenJobTraces {
   @Test
   public void testSmallTrace() throws Exception {
+    performSingleTest("sample-job-tracker-logs.gz",
+        "job-tracker-logs-topology-output", "job-tracker-logs-trace-output.gz");
+  }
+
+  @Test
+  public void testTruncatedTask() throws Exception {
+    performSingleTest("truncated-job-tracker-log", "truncated-topology-output",
+        "truncated-trace-output");
+  }
+
+  private void performSingleTest(String jtLogName, String goldTopology,
+      String goldTrace) throws Exception {
     final Configuration conf = new Configuration();
     final FileSystem lfs = FileSystem.getLocal(conf);
 
-    final Path rootInputDir = new Path(
-        System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
-    final Path rootTempDir = new Path(
-        System.getProperty("test.build.data", "/tmp")).makeQualified(lfs);
-
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", ""))
+            .makeQualified(lfs);
+    final Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp"))
+            .makeQualified(lfs);
 
     final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
     final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
     lfs.delete(tempDir, true);
 
-    final Path topologyFile = new Path(tempDir, "topology.json");
-    final Path traceFile = new Path(tempDir, "trace.json");
+    final Path topologyFile = new Path(tempDir, jtLogName + "-topology.json");
+    final Path traceFile = new Path(tempDir, jtLogName + "-trace.json");
 
-    final Path inputFile = new Path(rootInputFile, "sample-job-tracker-logs");
+    final Path inputFile = new Path(rootInputFile, jtLogName);
 
     System.out.println("topology result file = " + topologyFile);
     System.out.println("trace result file = " + traceFile);
@@ -66,10 +79,8 @@
 
     args[5] = inputFile.toString();
 
-    final Path topologyGoldFile = new Path(rootInputFile, 
-        "job-tracker-logs-topology-output");
-    final Path traceGoldFile = new Path(rootInputFile,
-        "job-tracker-logs-trace-output");
+    final Path topologyGoldFile = new Path(rootInputFile, goldTopology);
+    final Path traceGoldFile = new Path(rootInputFile, goldTrace);
 
     HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
     int result = ToolRunner.run(analyzer, args);
@@ -85,12 +96,11 @@
   static private <T extends DeepCompare> void jsonFileMatchesGold(
       FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
       String fileDescription) throws IOException {
-    InputStream goldStream = lfs.open(gold);
-    JsonObjectMapperParser<T> goldParser = new JsonObjectMapperParser<T>(
-        goldStream, clazz);
+    JsonObjectMapperParser<T> goldParser =
+        new JsonObjectMapperParser<T>(gold, clazz, new Configuration());
     InputStream resultStream = lfs.open(result);
-    JsonObjectMapperParser<T> resultParser = new JsonObjectMapperParser<T>(
-        resultStream, clazz);
+    JsonObjectMapperParser<T> resultParser =
+        new JsonObjectMapperParser<T>(resultStream, clazz);
     try {
       while (true) {
         DeepCompare goldJob = goldParser.getNext();

Modified: hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/CDFRandomGenerator.java Tue Oct 27 15:43:58 2009
@@ -39,16 +39,17 @@
   final Random random;
 
   CDFRandomGenerator(LoggedDiscreteCDF cdf) {
-    random = new Random();
-    rankings = new double[(int) cdf.getNumberValues() + 2];
-    values = new long[(int) cdf.getNumberValues() + 2];
-    initializeTables(cdf);
+    this(cdf, new Random());
   }
 
   CDFRandomGenerator(LoggedDiscreteCDF cdf, long seed) {
-    random = new Random(seed);
-    rankings = new double[(int) cdf.getNumberValues() + 2];
-    values = new long[(int) cdf.getNumberValues() + 2];
+    this(cdf, new Random(seed));
+  }
+
+  private CDFRandomGenerator(LoggedDiscreteCDF cdf, Random random) {
+    this.random = random;
+    rankings = new double[cdf.getRankings().size() + 2];
+    values = new long[cdf.getRankings().size() + 2];
     initializeTables(cdf);
   }
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Tue Oct 27 15:43:58 2009
@@ -1043,6 +1043,10 @@
             e);
         typ = null;
       }
+      
+      if (typ == null) {
+        return;
+      }
 
       task.setTaskType(typ);
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Tue Oct 27 15:43:58 2009
@@ -22,6 +22,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.TreeSet;
 
 import org.codehaus.jackson.annotate.JsonAnySetter;
@@ -44,7 +45,8 @@
     VERY_LOW, LOW, NORMAL, HIGH, VERY_HIGH
   };
 
-  static private TreeSet<String> alreadySeenAnySetterAttributes = new TreeSet<String>();
+  static private Set<String> alreadySeenAnySetterAttributes =
+      new TreeSet<String>();
 
   String jobID;
   String user;
@@ -59,7 +61,7 @@
   int heapMegabytes = -1;
   int totalMaps = -1;
   int totalReduces = -1;
-  Pre21JobHistoryConstants.Values outcome = Pre21JobHistoryConstants.Values.SUCCESS;
+  Pre21JobHistoryConstants.Values outcome = null;
   JobType jobtype = JobType.JAVA;
   JobPriority priority = JobPriority.NORMAL;
 
@@ -100,7 +102,8 @@
     setJobID(jobID);
   }
 
-  @SuppressWarnings("unused") // for input parameter ignored.
+  @SuppressWarnings("unused")
+  // for input parameter ignored.
   @JsonAnySetter
   public void setUnknownAttribute(String attributeName, Object ignored) {
     if (!alreadySeenAnySetterAttributes.contains(attributeName)) {
@@ -388,8 +391,9 @@
     }
   }
 
-  private void compare1(Pre21JobHistoryConstants.Values c1, Pre21JobHistoryConstants.Values c2,
-      TreePath loc, String eltname) throws DeepInequalityException {
+  private void compare1(Pre21JobHistoryConstants.Values c1,
+      Pre21JobHistoryConstants.Values c2, TreePath loc, String eltname)
+      throws DeepInequalityException {
     if (c1 != c2) {
       throw new DeepInequalityException(eltname + " miscompared", new TreePath(
           loc, eltname));

Modified: hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Tue Oct 27 15:43:58 2009
@@ -19,6 +19,10 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.codehaus.jackson.annotate.JsonAnySetter;
 
 /**
  * A {@link LoggedLocation} is a representation of a point in an hierarchical
@@ -41,9 +45,12 @@
    * The full path from the root of the network to the host.
    * 
    * NOTE that this assumes that the network topology is a tree.
-   */ 
+   */
   List<String> layers = new ArrayList<String>();
 
+  static private Set<String> alreadySeenAnySetterAttributes =
+      new TreeSet<String>();
+
   public List<String> getLayers() {
     return layers;
   }
@@ -52,6 +59,17 @@
     this.layers = layers;
   }
 
+  @SuppressWarnings("unused")
+  // for input parameter ignored.
+  @JsonAnySetter
+  public void setUnknownAttribute(String attributeName, Object ignored) {
+    if (!alreadySeenAnySetterAttributes.contains(attributeName)) {
+      alreadySeenAnySetterAttributes.add(attributeName);
+      System.err.println("In LoggedJob, we saw the unknown attribute "
+          + attributeName + ".");
+    }
+  }
+
   // I'll treat this as an atomic object type
   private void compareStrings(List<String> c1, List<String> c2, TreePath loc,
       String eltname) throws DeepInequalityException {

Modified: hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Tue Oct 27 15:43:58 2009
@@ -23,9 +23,13 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.ArrayList;
 import java.util.Comparator;
 
+import org.codehaus.jackson.annotate.JsonAnySetter;
+
 /**
  * A {@link LoggedNetworkTopology} represents a tree that in turn represents a
  * hierarchy of hosts. The current version requires the tree to have all leaves
@@ -39,10 +43,24 @@
   String name;
   List<LoggedNetworkTopology> children = new ArrayList<LoggedNetworkTopology>();
 
+  static private Set<String> alreadySeenAnySetterAttributes =
+      new TreeSet<String>();
+
   public LoggedNetworkTopology() {
     super();
   }
 
+  @SuppressWarnings("unused")
+  // for input parameter ignored.
+  @JsonAnySetter
+  public void setUnknownAttribute(String attributeName, Object ignored) {
+    if (!alreadySeenAnySetterAttributes.contains(attributeName)) {
+      alreadySeenAnySetterAttributes.add(attributeName);
+      System.err.println("In LoggedJob, we saw the unknown attribute "
+          + attributeName + ".");
+    }
+  }
+
   /**
    * We need this because we have to sort the {@code children} field. That field
    * is set-valued, but if we sort these fields we ensure that comparisons won't
@@ -70,7 +88,8 @@
     this.children = null;
 
     if (level < ParsedHost.numberOfDistances() - 1) {
-      HashMap<String, HashSet<ParsedHost>> topologies = new HashMap<String, HashSet<ParsedHost>>();
+      HashMap<String, HashSet<ParsedHost>> topologies =
+          new HashMap<String, HashSet<ParsedHost>>();
 
       Iterator<ParsedHost> iter = hosts.iterator();
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/tools/org/apache/hadoop/tools/rumen/LoggedSingleRelativeRanking.java Tue Oct 27 15:43:58 2009
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.codehaus.jackson.annotate.JsonAnySetter;
+
 /**
  * A {@link LoggedSingleRelativeRanking} represents an X-Y coordinate of a
  * single point in a discrete CDF.
@@ -36,6 +41,20 @@
    */
   long datum = -1L;
 
+  static private Set<String> alreadySeenAnySetterAttributes =
+      new TreeSet<String>();
+
+  @SuppressWarnings("unused")
+  // for input parameter ignored.
+  @JsonAnySetter
+  public void setUnknownAttribute(String attributeName, Object ignored) {
+    if (!alreadySeenAnySetterAttributes.contains(attributeName)) {
+      alreadySeenAnySetterAttributes.add(attributeName);
+      System.err.println("In LoggedJob, we saw the unknown attribute "
+          + attributeName + ".");
+    }
+  }
+
   public double getRelativeRanking() {
     return relativeRanking;
   }