You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:03:24 UTC

svn commit: r1077322 [2/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: c++/task-controller/ c++/task-controller/tests/ mapred/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/ mapred/org/apache/hadoop/mapreduce/se...

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.JvmFinishedEvent;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verify the logs' truncation functionality.
+ */
+public class TestTaskLogsTruncater {
+
+  static final Log LOG = LogFactory.getLog(TestTaskLogsTruncater.class);
+
+  /**
+   * clean-up any stale directories after enabling writable permissions for all
+   * attempt-dirs.
+   * 
+   * @throws IOException
+   */
+  @After
+  public void tearDown() throws IOException {
+    File logDir = TaskLog.getUserLogDir();
+    for (File attemptDir : logDir.listFiles()) {
+      attemptDir.setWritable(true);
+      FileUtil.fullyDelete(attemptDir);
+    }
+  }
+
+  void writeRealBytes(TaskAttemptID firstAttemptID,
+      TaskAttemptID attemptID, LogName logName, long numBytes, char data)
+      throws IOException {
+
+    File logFile = TaskLog.getTaskLogFile(firstAttemptID, logName);
+
+    LOG.info("Going to write " + numBytes + " real bytes to the log file "
+        + logFile);
+
+    if (!logFile.getParentFile().exists()
+        && !logFile.getParentFile().mkdirs()) {
+      throw new IOException("Couldn't create all ancestor dirs for "
+          + logFile);
+    }
+
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    if (!attemptDir.exists() && !attemptDir.mkdirs()) {
+      throw new IOException("Couldn't create all ancestor dirs for "
+          + logFile);
+    }
+
+    // Need to call up front to set currenttaskid.
+    TaskLog.syncLogs(firstAttemptID, attemptID);
+
+    FileWriter writer = new FileWriter(logFile, true);
+    for (long i = 0; i < numBytes; i++) {
+      writer.write(data);
+    }
+    writer.close();
+    TaskLog.syncLogs(firstAttemptID, attemptID);
+    LOG.info("Written " + numBytes + " real bytes to the log file "
+        + logFile);
+  }
+
+  private static Map<LogName, Long> getAllLogsFileLengths(
+      TaskAttemptID tid, boolean isCleanup) throws IOException {
+    Map<LogName, Long> allLogsFileLengths = new HashMap<LogName, Long>();
+
+    // If the index file doesn't exist, we cannot get log-file lengths. So set
+    // them to zero.
+    if (!TaskLog.getIndexFile(tid.toString(), isCleanup).exists()) {
+      for (LogName log : LogName.values()) {
+        allLogsFileLengths.put(log, Long.valueOf(0));
+      }
+      return allLogsFileLengths;
+    }
+
+    Map<LogName, LogFileDetail> logFilesDetails =
+        TaskLog.getAllLogsFileDetails(tid, isCleanup);
+    for (LogName log : logFilesDetails.keySet()) {
+      allLogsFileLengths.put(log,
+          Long.valueOf(logFilesDetails.get(log).length));
+    }
+    return allLogsFileLengths;
+  }
+
+  private Configuration setRetainSizes(long mapRetainSize,
+      long reduceRetainSize) {
+    Configuration conf = new Configuration();
+    conf.setLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, mapRetainSize);
+    conf.setLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE, reduceRetainSize);
+    return conf;
+  }
+
+  /**
+   * Test cases which don't need any truncation of log-files. Without JVM-reuse.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testNoTruncationNeeded() throws IOException {
+    Configuration conf = setRetainSizes(1000L, 1000L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+    TaskID baseId = new TaskID();
+    int taskcount = 0;
+
+    TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+    Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+                            0);
+
+    // Let the tasks write logs within retain-size
+    for (LogName log : LogName.values()) {
+      writeRealBytes(attemptID, attemptID, log, 500, 'H');
+    }
+    File logIndex = TaskLog.getIndexFile(attemptID.toString(), false);
+    long indexModificationTimeStamp = logIndex.lastModified();
+
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+    assertEquals("index file got modified", indexModificationTimeStamp,
+        logIndex.lastModified());
+
+    // Finish the task and the JVM too.
+    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+    // There should be no truncation of the log-file.
+    assertTrue(attemptDir.exists());
+    assertEquals("index file got modified", indexModificationTimeStamp,
+        logIndex.lastModified());
+
+    Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(500, logFile.length());
+      // The index file should also be proper.
+      assertEquals(500, logLengths.get(log).longValue());
+    }
+
+    // truncate it once again
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+    assertEquals("index file got modified", indexModificationTimeStamp,
+        logIndex.lastModified());
+    
+    logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(500, logFile.length());
+      // The index file should also be proper.
+      assertEquals(500, logLengths.get(log).longValue());
+    }
+  }
+
+  /**
+   * Test the disabling of truncation of log-file.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testDisabledLogTruncation() throws IOException {
+    // Anything less than 0 disables the truncation.
+    Configuration conf = setRetainSizes(-1L, -1L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+    TaskID baseId = new TaskID();
+    int taskcount = 0;
+
+    TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+    Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+                            0);
+
+    // Let the tasks write some logs
+    for (LogName log : LogName.values()) {
+      writeRealBytes(attemptID, attemptID, log, 1500, 'H');
+    }
+
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+    // Finish the task and the JVM too.
+    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+    // The log-file should not be truncated.
+    assertTrue(attemptDir.exists());
+    Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(1500, logFile.length());
+      // The index file should also be proper.
+      assertEquals(1500, logLengths.get(log).longValue());
+    }
+  }
+
+  /**
+   * Test the truncation of log-file when JVMs are not reused.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testLogTruncationOnFinishing() throws IOException {
+    Configuration conf = setRetainSizes(1000L, 1000L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+    TaskID baseId = new TaskID();
+    int taskcount = 0;
+
+    TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+    Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(), 
+                            0);
+
+    // Let the tasks write logs more than retain-size
+    for (LogName log : LogName.values()) {
+      writeRealBytes(attemptID, attemptID, log, 1500, 'H');
+    }
+
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+    // Finish the task and the JVM too.
+    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+    // The log-file should now be truncated.
+    assertTrue(attemptDir.exists());
+
+    Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(1000, logFile.length());
+      // The index file should also be proper.
+      assertEquals(1000, logLengths.get(log).longValue());
+    }
+
+    // truncate once again
+    logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(1000, logFile.length());
+      // The index file should also be proper.
+      assertEquals(1000, logLengths.get(log).longValue());
+    }
+  }
+
+  /**
+   * Test the truncation of log-file.
+   * 
+   * It writes two log files and truncates one, does not truncate other. 
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testLogTruncation() throws IOException {
+    Configuration conf = setRetainSizes(1000L, 1000L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+    TaskID baseId = new TaskID();
+    int taskcount = 0;
+
+    TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+    Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(), 
+                            0);
+
+    // Let the tasks write logs more than retain-size
+    writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
+    writeRealBytes(attemptID, attemptID, LogName.STDERR, 500, 'H');
+
+    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+    // Finish the task and the JVM too.
+    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+    // The log-file should now be truncated.
+    assertTrue(attemptDir.exists());
+
+    Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+    File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+    assertEquals(1000, logFile.length());
+    // The index file should also be proper.
+    assertEquals(1000, logLengths.get(LogName.SYSLOG).longValue());
+    logFile = TaskLog.getTaskLogFile(attemptID, LogName.STDERR);
+    assertEquals(500, logFile.length());
+    // The index file should also be proper.
+    assertEquals(500, logLengths.get(LogName.STDERR).longValue());
+
+    // truncate once again
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+    logLengths = getAllLogsFileLengths(attemptID, false);
+    logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+    assertEquals(1000, logFile.length());
+    // The index file should also be proper.
+    assertEquals(1000, logLengths.get(LogName.SYSLOG).longValue());
+    logFile = TaskLog.getTaskLogFile(attemptID, LogName.STDERR);
+    assertEquals(500, logFile.length());
+    // The index file should also be proper.
+    assertEquals(500, logLengths.get(LogName.STDERR).longValue());
+  }
+
+  /**
+   * Test the truncation of log-file when JVM-reuse is enabled.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testLogTruncationOnFinishingWithJVMReuse() throws IOException {
+    Configuration conf = setRetainSizes(150L, 150L);
+    UserLogManager logManager = new UtilsForTests.InLineUserLogManager(conf);
+
+    TaskID baseTaskID = new TaskID();
+    int attemptsCount = 0;
+
+    // Assuming the job's retain size is 150
+    TaskAttemptID attempt1 = new TaskAttemptID(baseTaskID, attemptsCount++);
+    Task task1 = new MapTask(null, attempt1, 0, new JobSplit.TaskSplitIndex(),
+                             0);
+
+    // Let the tasks write logs more than retain-size
+    writeRealBytes(attempt1, attempt1, LogName.SYSLOG, 200, 'A');
+
+    File attemptDir = TaskLog.getAttemptDir(attempt1.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+    // Start another attempt in the same JVM
+    TaskAttemptID attempt2 = new TaskAttemptID(baseTaskID, attemptsCount++);
+    Task task2 = new MapTask(null, attempt2, 0, new JobSplit.TaskSplitIndex(),
+                             0);
+    // Let attempt2 also write some logs
+    writeRealBytes(attempt1, attempt2, LogName.SYSLOG, 100, 'B');
+    // Start yet another attempt in the same JVM
+    TaskAttemptID attempt3 = new TaskAttemptID(baseTaskID, attemptsCount++);
+    Task task3 = new MapTask(null, attempt3, 0, new JobSplit.TaskSplitIndex(),
+                             0);
+    // Let attempt3 also write some logs
+    writeRealBytes(attempt1, attempt3, LogName.SYSLOG, 225, 'C');
+    // Finish the JVM.
+    JVMInfo jvmInfo = new JVMInfo(attempt1, Arrays.asList((new Task[] { task1,
+        task2, task3 })));
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+
+    // The log-file should now be truncated.
+    assertTrue(attemptDir.exists());
+    File logFile = TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG);
+    assertEquals(400, logFile.length());
+    // The index files should also be proper.
+    assertEquals(150, getAllLogsFileLengths(attempt1, false).get(
+        LogName.SYSLOG).longValue());
+    assertEquals(100, getAllLogsFileLengths(attempt2, false).get(
+        LogName.SYSLOG).longValue());
+    assertEquals(150, getAllLogsFileLengths(attempt3, false).get(
+        LogName.SYSLOG).longValue());
+
+    // assert the data.
+    FileReader reader =
+        new FileReader(TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG));
+    int ch, bytesRead = 0;
+    boolean dataValid = true;
+    while ((ch = reader.read()) != -1) {
+      bytesRead++;
+      if (bytesRead <= 150) {
+        if ((char) ch != 'A') {
+          LOG.warn("Truncation didn't happen properly. At "
+              + (bytesRead + 1) + "th byte, expected 'A' but found "
+              + (char) ch);
+          dataValid = false;
+        }
+      } else if (bytesRead <= 250) {
+        if ((char) ch != 'B') {
+          LOG.warn("Truncation didn't happen properly. At "
+              + (bytesRead + 1) + "th byte, expected 'B' but found "
+              + (char) ch);
+          dataValid = false;
+        }
+      } else if ((char) ch != 'C') {
+        LOG.warn("Truncation didn't happen properly. At " + (bytesRead + 1)
+            + "th byte, expected 'C' but found " + (char) ch);
+        dataValid = false;
+      }
+    }
+    assertTrue("Log-truncation didn't happen properly!", dataValid);
+
+    logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
+    assertEquals(400, logFile.length());
+  }
+
+  private static String TEST_ROOT_DIR =
+      new File(System.getProperty("test.build.data", "/tmp")).toURI().toString().replace(
+          ' ', '+');
+
+  private static String STDERR_LOG = "stderr log";
+  public static class LoggingMapper<K, V> extends IdentityMapper<K, V> {
+
+    public void map(K key, V val, OutputCollector<K, V> output,
+        Reporter reporter) throws IOException {
+      // Write lots of logs
+      for (int i = 0; i < 1000; i++) {
+        System.out.println("Lots of logs! Lots of logs! "
+            + "Waiting to be truncated! Lots of logs!");
+      }
+      // write some log into stderr
+      System.err.println(STDERR_LOG);
+      super.map(key, val, output, reporter);
+    }
+  }
+
+  /**
+   * Test logs monitoring with {@link MiniMRCluster}
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testLogsMonitoringWithMiniMR() throws IOException {
+
+    MiniMRCluster mr = null;
+    try {
+      JobConf clusterConf = new JobConf();
+      clusterConf.setLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, 10000L);
+      clusterConf.setLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+      mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
+
+      JobConf conf = mr.createJobConf();
+
+      Path inDir = new Path(TEST_ROOT_DIR + "/input");
+      Path outDir = new Path(TEST_ROOT_DIR + "/output");
+      FileSystem fs = FileSystem.get(conf);
+      if (fs.exists(outDir)) {
+        fs.delete(outDir, true);
+      }
+      if (!fs.exists(inDir)) {
+        fs.mkdirs(inDir);
+      }
+      String input = "The quick brown fox jumped over the lazy dog";
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+
+      conf.setInputFormat(TextInputFormat.class);
+      conf.setOutputKeyClass(LongWritable.class);
+      conf.setOutputValueClass(Text.class);
+
+      FileInputFormat.setInputPaths(conf, inDir);
+      FileOutputFormat.setOutputPath(conf, outDir);
+      conf.setNumMapTasks(1);
+      conf.setNumReduceTasks(0);
+      conf.setMapperClass(LoggingMapper.class);
+
+      RunningJob job = JobClient.runJob(conf);
+      assertTrue(job.getJobState() == JobStatus.SUCCEEDED);
+      for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+        long length =
+            TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+                TaskLog.LogName.STDOUT).length();
+        assertTrue("STDOUT log file length for " + tce.getTaskAttemptId()
+            + " is " + length + " and not <=10000", length <= 10000);
+        if (tce.isMap) {
+          String stderr = TestMiniMRMapRedDebugScript.readTaskLog(
+              LogName.STDERR, tce.getTaskAttemptId(), false);
+          System.out.println("STDERR log:" + stderr);
+          assertTrue(stderr.length() > 0);
+          assertTrue(stderr.length() < 10000);
+          assertTrue(stderr.equals(STDERR_LOG));
+        }
+      }
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test the truncation of DEBUGOUT file by {@link TaskLogsTruncater}
+   * @throws IOException 
+   */
+  @Test
+  public void testDebugLogsTruncationWithMiniMR() throws IOException {
+
+    MiniMRCluster mr = null;
+    try {
+      JobConf clusterConf = new JobConf();
+      clusterConf.setLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, 10000L);
+      clusterConf.setLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+      mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
+
+      JobConf conf = mr.createJobConf();
+
+      Path inDir = new Path(TEST_ROOT_DIR + "/input");
+      Path outDir = new Path(TEST_ROOT_DIR + "/output");
+      FileSystem fs = FileSystem.get(conf);
+      if (fs.exists(outDir)) {
+        fs.delete(outDir, true);
+      }
+      if (!fs.exists(inDir)) {
+        fs.mkdirs(inDir);
+      }
+      String input = "The quick brown fox jumped over the lazy dog";
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+
+      conf.setInputFormat(TextInputFormat.class);
+      conf.setOutputKeyClass(LongWritable.class);
+      conf.setOutputValueClass(Text.class);
+
+      FileInputFormat.setInputPaths(conf, inDir);
+      FileOutputFormat.setOutputPath(conf, outDir);
+      conf.setNumMapTasks(1);
+      conf.setMaxMapAttempts(1);
+      conf.setNumReduceTasks(0);
+      conf.setMapperClass(TestMiniMRMapRedDebugScript.MapClass.class);
+
+      // copy debug script to cache from local file system.
+      Path scriptPath = new Path(TEST_ROOT_DIR, "debug-script.txt");
+      String debugScriptContent =
+          "for ((i=0;i<1000;i++)); " + "do "
+              + "echo \"Lots of logs! Lots of logs! "
+              + "Waiting to be truncated! Lots of logs!\";" + "done";
+      DataOutputStream scriptFile = fs.create(scriptPath);
+      scriptFile.writeBytes(debugScriptContent);
+      scriptFile.close();
+      new File(scriptPath.toUri().getPath()).setExecutable(true);
+
+      URI uri = scriptPath.toUri();
+      DistributedCache.createSymlink(conf);
+      DistributedCache.addCacheFile(uri, conf);
+      conf.setMapDebugScript(scriptPath.toUri().getPath());
+
+      RunningJob job = null;
+      try {
+        JobClient jc = new JobClient(conf);
+        job = jc.submitJob(conf);
+        try {
+          jc.monitorAndPrintJob(conf, job);
+        } catch (InterruptedException e) {
+          //
+        }
+      } catch (IOException ioe) {
+      } finally{
+        for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+          File debugOutFile =
+              TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+                  TaskLog.LogName.DEBUGOUT);
+          if (debugOutFile.exists()) {
+            long length = debugOutFile.length();
+            assertTrue("DEBUGOUT log file length for "
+                + tce.getTaskAttemptId() + " is " + length
+                + " and not =10000", length == 10000);
+          }
+        }
+      }
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar  4 04:03:23 2011
@@ -38,12 +38,11 @@ import org.apache.hadoop.mapred.TaskCont
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
 
 /**
@@ -75,6 +74,7 @@ public class TestTaskTrackerLocalization
   protected Path attemptWorkDir;
   protected File[] attemptLogFiles;
   protected JobConf localizedTaskConf;
+  private TaskInProgress tip;
 
   /**
    * Dummy method in this base class. Only derived classes will define this
@@ -113,8 +113,14 @@ public class TestTaskTrackerLocalization
     trackerFConf.setStrings("mapred.local.dir", localDirs);
 
     // Create the job configuration file. Same as trackerConf in this test.
-    JobConf jobConf = trackerFConf;
-
+    JobConf jobConf = new JobConf(trackerFConf);
+    // Set job view ACLs in conf sothat validation of contents of jobACLsFile
+    // can be done against this value. Have both users and groups
+    String jobViewACLs = "user1,user2, group1,group2";
+    jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
+    jobConf.setInt("mapred.userlog.retain.hours", 0);
+    String jtIdentifier = "200907202331";
+    jobId = new JobID(jtIdentifier, 1);
 
     // JobClient uploads the job jar to the file system and sets it in the
     // jobConf.
@@ -123,9 +129,16 @@ public class TestTaskTrackerLocalization
     // JobClient uploads the jobConf to the file system.
     File jobConfFile = uploadJobConf(jobConf);
 
+    // create jobTokens file
+    uploadJobTokensFile();
+
     // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
+    tracker.setIndexCache(new IndexCache(trackerFConf));
+    tracker.setUserLogManager(new UtilsForTests.InLineUserLogManager(
+        trackerFConf));
+    tracker.setTaskMemoryManagerEnabledFlag();
 
     // for test case system FS is the local FS
 
@@ -136,13 +149,6 @@ public class TestTaskTrackerLocalization
     taskTrackerUGI = UserGroupInformation.getCurrentUser();
 
     // Set up the task to be localized
-    String jtIdentifier = "200907202331";
-    jobId = new JobID(jtIdentifier, 1);
-    
-    TaskTracker.RunningJob rjob = new TaskTracker.RunningJob(jobId);
-    rjob.ugi = UserGroupInformation.getCurrentUser();
-    tracker.runningJobs.put(jobId, rjob);
-    
     taskId =
         new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
     task =
@@ -150,8 +156,6 @@ public class TestTaskTrackerLocalization
     task.setConf(jobConf); // Set conf. Set user name in particular.
     task.setUser(UserGroupInformation.getCurrentUser().getUserName());
 
-    // create jobTokens file
-    uploadJobTokensFile();
 
     taskController = new DefaultTaskController();
     taskController.setConf(trackerFConf);
@@ -160,6 +164,10 @@ public class TestTaskTrackerLocalization
     tracker.setTaskController(taskController);
     tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
                                        taskController));
+
+    // mimic register task
+    // create the tip
+    tip = tracker.new TaskInProgress(task, trackerFConf);
   }
 
   /**
@@ -356,24 +364,8 @@ public class TestTaskTrackerLocalization
     if (!canRun()) {
       return;
     }
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-
-    // /////////// The main method being tested
-    localizedJobConf = tracker.localizeJobFiles(task, 
-        new TaskTracker.RunningJob(task.getJobID()));
-    // ///////////
-
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext context = new JobInitializationContext();
-    context.jobid = jobId;
-    context.user = task.getUser();
-    context.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-
-    // /////////// The method being tested
-    taskController.initializeJob(context);
-    // ///////////
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
 
     checkJobLocalization();
   }
@@ -441,6 +433,13 @@ public class TestTaskTrackerLocalization
     assertTrue(
         "mapred.jar is not set properly to the target users directory : "
             + localizedJobJar, mapredJarFlag);
+
+    // check job user-log directory permissions
+    File jobLogDir = TaskLog.getJobDir(jobId);
+    assertTrue("job log directory " + jobLogDir + " does not exist!", jobLogDir
+        .exists());
+    checkFilePermissions(jobLogDir.toString(), "drwx------", task.getUser(),
+        taskTrackerUGI.getGroupNames()[0]);
   }
 
   /**
@@ -453,25 +452,9 @@ public class TestTaskTrackerLocalization
     if (!canRun()) {
       return;
     }
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task, 
-        new TaskTracker.RunningJob(task.getJobID()));
-
-    // Set job view ACLs in conf sothat validation of contents of jobACLsFile
-    // can be done against this value. Have both users and groups
-    String jobViewACLs = "user1,user2, group1,group2";
-    localizedJobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
-
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = task.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
 
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     tip.setJobConf(localizedJobConf);
 
     // ////////// The central method being tested
@@ -552,9 +535,7 @@ public class TestTaskTrackerLocalization
         .getPath(), "tmp").exists());
 
     // Make sure that the logs are setup properly
-    File logDir =
-        new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
-            + task.getTaskID().toString());
+    File logDir = TaskLog.getAttemptDir(taskId.toString());
     assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
         logDir.exists());
     checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
@@ -709,21 +690,12 @@ public class TestTaskTrackerLocalization
   private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
       throws Exception {
     // Localize job and localize task.
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task, 
-        new TaskTracker.RunningJob(task.getJobID()));
+    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
     if (jvmReuse) {
       localizedJobConf.setNumTasksToExecutePerJvm(2);
     }
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = localizedJobConf.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     tip.setJobConf(localizedJobConf);
     tip.localizeTask(task);
     Path workDir =
@@ -777,22 +749,17 @@ public class TestTaskTrackerLocalization
    */
   private void verifyUserLogsCleanup()
       throws IOException {
-    Path logDir =
-        new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
-            + Path.SEPARATOR + task.getTaskID().toString());
-
+    // verify user logs cleanup
+    File jobUserLogDir = TaskLog.getJobDir(jobId);
     // Logs should be there before cleanup.
-    assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
-        tracker.getLocalFileSystem().exists(logDir));
-
-    // ////////// Another being tested
-    TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
-    // modification time behind retainTimeStatmp
-    // //////////
-
+    assertTrue("Userlogs dir " + jobUserLogDir + " is not present as expected!!",
+          jobUserLogDir.exists());
+    tracker.purgeJob(new KillJobAction(jobId));
+    tracker.getUserLogManager().getUserLogCleaner().processCompletedJobs();
+    
     // Logs should be gone after cleanup.
-    assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
-        tracker.getLocalFileSystem().exists(logDir));
+    assertFalse("Userlogs dir " + jobUserLogDir + " is not deleted as expected!!",
+        jobUserLogDir.exists());
   }
   
   /**
@@ -806,24 +773,12 @@ public class TestTaskTrackerLocalization
     }
     
     LOG.info("Running testJobCleanup()");
-    // Localize job and localize task.
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = 
-      tracker.localizeJobFiles(task, 
-                               new TaskTracker.RunningJob(task.getJobID()));
-    
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = localizedJobConf.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
-    
     // Set an inline cleanup queue
     InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
     tracker.setCleanupThread(cleanupQueue);
+    // Localize job and localize task.
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
     
     // Create a file in job's work-dir with 555
     String jobWorkDir = 

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1077322&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java Fri Mar  4 04:03:23 2011
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+public class TestUserLogCleanup {
+  private static String jtid = "test";
+  private static long ONE_HOUR = 1000 * 60 * 60;
+  private Localizer localizer;
+  private UserLogManager userLogManager;
+  private UserLogCleaner userLogCleaner;
+  private TaskTracker tt;
+  private FakeClock myClock;
+  private JobID jobid1 = new JobID(jtid, 1);
+  private JobID jobid2 = new JobID(jtid, 2);
+  private JobID jobid3 = new JobID(jtid, 3);
+  private JobID jobid4 = new JobID(jtid, 4);
+  private File foo = new File(TaskLog.getUserLogDir(), "foo");
+  private File bar = new File(TaskLog.getUserLogDir(), "bar");
+
+  public TestUserLogCleanup() throws IOException {
+    Configuration conf = new Configuration();
+    startTT(conf);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtil.fullyDelete(TaskLog.getUserLogDir());
+  }
+
+  private File localizeJob(JobID jobid) throws IOException {
+    File jobUserlog = TaskLog.getJobDir(jobid);
+    // localize job log directory
+    tt.initializeJobLogDir(jobid);
+    assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
+    return jobUserlog;
+  }
+
+  private void jobFinished(JobID jobid, int logRetainHours) {
+    JobCompletedEvent jce = new JobCompletedEvent(jobid, myClock.getTime(),
+        logRetainHours);
+    userLogManager.addLogEvent(jce);
+  }
+
+  private void startTT(Configuration conf) throws IOException {
+    myClock = new FakeClock(); // clock is reset.
+    tt = new TaskTracker();
+    localizer = new Localizer(FileSystem.get(conf), conf
+        .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
+        new DefaultTaskController());
+    tt.setLocalizer(localizer);
+    userLogManager = new UtilsForTests.InLineUserLogManager(conf);
+    userLogCleaner = userLogManager.getUserLogCleaner();
+    userLogCleaner.setClock(myClock);
+    tt.setUserLogManager(userLogManager);
+    userLogManager.clearOldUserLogs(conf);
+  }
+
+  private void ttReinited() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+    userLogManager.clearOldUserLogs(conf);
+  }
+
+  private void ttRestarted() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+    startTT(conf);
+  }
+
+  /**
+   * Tests job user-log directory deletion.
+   * 
+   * Adds two jobs for log deletion. One with one hour retain hours, other with
+   * two retain hours. After an hour,
+   * TaskLogCleanupThread.processCompletedJobs() call, makes sure job with 1hr
+   * retain hours is removed and other is retained. After one more hour, job
+   * with 2hr retain hours is also removed.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testJobLogCleanup() throws IOException {
+    File jobUserlog1 = localizeJob(jobid1);
+    File jobUserlog2 = localizeJob(jobid2);
+
+    // add job user log directory for deletion, with 2 hours for deletion
+    jobFinished(jobid1, 2);
+
+    // add the job for deletion with one hour as retain hours
+    jobFinished(jobid2, 1);
+
+    // remove old logs and see jobid1 is not removed and jobid2 is removed
+    myClock.advance(ONE_HOUR);
+    userLogCleaner.processCompletedJobs();
+    assertTrue(jobUserlog1 + " got deleted", jobUserlog1.exists());
+    assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists());
+
+    myClock.advance(ONE_HOUR);
+    // remove old logs and see jobid1 is removed now
+    userLogCleaner.processCompletedJobs();
+    assertFalse(jobUserlog1 + " still exists.", jobUserlog1.exists());
+  }
+
+  /**
+   * Tests user-log directory cleanup on a TT re-init with 3 hours as log retain
+   * hours for tracker.
+   * 
+   * Adds job1 deletion before the re-init with 2 hour retain hours. Adds job2
+   * for which there are no tasks/killJobAction after the re-init. Adds job3 for
+   * which there is localizeJob followed by killJobAction with 3 hours as retain
+   * hours. Adds job4 for which there are some tasks after the re-init.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testUserLogCleanup() throws IOException {
+    File jobUserlog1 = localizeJob(jobid1);
+    File jobUserlog2 = localizeJob(jobid2);
+    File jobUserlog3 = localizeJob(jobid3);
+    File jobUserlog4 = localizeJob(jobid4);
+    // create a some files/dirs in userlog
+    foo.mkdirs();
+    bar.createNewFile();
+
+    // add the jobid1 for deletion with retainhours = 2
+    jobFinished(jobid1, 2);
+
+    // time is now 1.
+    myClock.advance(ONE_HOUR);
+
+    // mimic TaskTracker reinit
+    // re-init the tt with 3 hours as user log retain hours.
+    // This re-init clears the user log directory
+    // job directories will be added with 3 hours as retain hours.
+    // i.e. They will be deleted at time 4.
+    ttReinited();
+
+    assertFalse(foo.exists());
+    assertFalse(bar.exists());
+    assertTrue(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 2.
+    userLogCleaner.processCompletedJobs();
+    assertFalse(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    // mimic localizeJob followed KillJobAction for jobid3
+    // add the job for deletion with retainhours = 3.
+    // jobid3 should be deleted at time 5.
+    jobUserlog3 = localizeJob(jobid3);
+    jobFinished(jobid3, 3);
+
+    // mimic localizeJob for jobid4
+    jobUserlog4 = localizeJob(jobid4);
+
+    // do cleanup
+    myClock.advance(2 * ONE_HOUR);
+    // time is now 4.
+    userLogCleaner.processCompletedJobs();
+
+    // jobid2 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 5.
+    // do cleanup again
+    userLogCleaner.processCompletedJobs();
+
+    // jobid3 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertFalse(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+  }
+
+  /**
+   * Tests user-log directory cleanup on a TT restart.
+   * 
+   * Adds job1 deletion before the restart with 2 hour retain hours. Adds job2
+   * for which there are no tasks/killJobAction after the restart. Adds job3 for
+   * which there is localizeJob followed by killJobAction after the restart with
+   * 3 hours retain hours. Adds job4 for which there are some tasks after the
+   * restart.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testUserLogCleanupAfterRestart() throws IOException {
+    File jobUserlog1 = localizeJob(jobid1);
+    File jobUserlog2 = localizeJob(jobid2);
+    File jobUserlog3 = localizeJob(jobid3);
+    File jobUserlog4 = localizeJob(jobid4);
+    // create a some files/dirs in userlog
+    foo.mkdirs();
+    bar.createNewFile();
+
+    // add the jobid1 for deletion with retain hours = 2
+    jobFinished(jobid1, 2);
+
+    // time is now 1.
+    myClock.advance(ONE_HOUR);
+
+    // Mimic the TaskTracker restart
+    // Restart the tt with 3 hours as user log retain hours.
+    // This restart clears the user log directory
+    // job directories will be added with 3 hours as retain hours.
+    // i.e. They will be deleted at time 3 as clock will reset after the restart
+    ttRestarted();
+
+    assertFalse(foo.exists());
+    assertFalse(bar.exists());
+    assertTrue(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 1.
+    userLogCleaner.processCompletedJobs();
+    assertTrue(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    // mimic localizeJob followed KillJobAction for jobid3
+    // add the job for deletion with retainhours = 3.
+    // jobid3 should be deleted at time 4.
+    jobUserlog3 = localizeJob(jobid3);
+    jobFinished(jobid3, 3);
+
+    // mimic localizeJob for jobid4
+    jobUserlog4 = localizeJob(jobid4);
+
+    // do cleanup
+    myClock.advance(2 * ONE_HOUR);
+    // time is now 3.
+    userLogCleaner.processCompletedJobs();
+
+    // jobid1 and jobid2 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 4.
+    // do cleanup again
+    userLogCleaner.processCompletedJobs();
+
+    // jobid3 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertFalse(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077322&r1=1077321&r2=1077322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar  4 04:03:23 2011
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.fail;
+
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.io.*;
@@ -47,6 +49,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogEvent;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
 import org.apache.hadoop.util.StringUtils;
 
 /** 
@@ -719,6 +723,26 @@ public class UtilsForTests {
     }
   }
 
+  /**
+   * This is an in-line {@link UserLogManager} to do all the actions in-line. 
+   */
+  static class InLineUserLogManager extends UserLogManager {
+    public InLineUserLogManager(Configuration conf) throws IOException {
+      super(conf);
+      getUserLogCleaner().setCleanupQueue(new InlineCleanupQueue());
+    }
+
+    // do the action in-line
+    public void addLogEvent(UserLogEvent event) {
+      try {
+        super.addLogEvent(event);
+        super.monitor();
+      } catch (Exception e) {
+        fail("failed to process action " + event.getEventType());
+      }
+    }
+  }
+
   static void setUpConfigFile(Properties confProps, File configFile)
     throws IOException {
     Configuration config = new Configuration(false);