You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:43:35 UTC

svn commit: r1077679 [6/6] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/core/org/apache/hadoop/fs/ src/core/org/apa...

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1077679&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskCommit.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskCommit.java Fri Mar  4 04:43:33 2011
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class TestTaskCommit extends HadoopTestCase {
+  Path rootDir = 
+    new Path(System.getProperty("test.build.data",  "/tmp"), "test");
+
+  static class CommitterWithCommitFail extends FileOutputCommitter {
+    public void commitTask(TaskAttemptContext context) throws IOException {
+      Path taskOutputPath = getTempTaskOutputPath(context);
+      TaskAttemptID attemptId = context.getTaskAttemptID();
+      JobConf job = context.getJobConf();
+      if (taskOutputPath != null) {
+        FileSystem fs = taskOutputPath.getFileSystem(job);
+        if (fs.exists(taskOutputPath)) {
+          throw new IOException();
+        }
+      }
+    }
+  }
+
+  /**
+   * Special Committer that does not cleanup temporary files in
+   * abortTask
+   * 
+   * The framework's FileOutputCommitter cleans up any temporary
+   * files left behind in abortTask. We want the test case to
+   * find these files and hence short-circuit abortTask.
+   */
+  static class CommitterWithoutCleanup extends FileOutputCommitter {
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+      // does nothing
+    }
+  }
+  
+  /**
+   * Special committer that always requires commit.
+   */
+  static class CommitterThatAlwaysRequiresCommit extends FileOutputCommitter {
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context) 
+      throws IOException {
+      return true;
+    }
+  }
+
+  public TestTaskCommit() throws IOException {
+    super(LOCAL_MR, LOCAL_FS, 1, 1);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    FileUtil.fullyDelete(new File(rootDir.toString()));
+  }
+    
+  public void testCommitFail() throws IOException {
+    Path rootDir = 
+      new Path(System.getProperty("test.build.data",  "/tmp"), "test");
+    final Path inDir = new Path(rootDir, "input");
+    final Path outDir = new Path(rootDir, "output");
+    JobConf jobConf = createJobConf();
+    jobConf.setMaxMapAttempts(1);
+    jobConf.setOutputCommitter(CommitterWithCommitFail.class);
+    RunningJob rJob = UtilsForTests.runJob(jobConf, inDir, outDir, 1, 0);
+    rJob.waitForCompletion();
+    assertEquals(JobStatus.FAILED, rJob.getJobState());
+  }
+  
+  private class MyUmbilical implements TaskUmbilicalProtocol {
+    boolean taskDone = false;
+
+    @Override
+    public boolean canCommit(TaskAttemptID taskid) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      fail("Task should not go to commit-pending");
+    }
+
+    @Override
+    public void done(TaskAttemptID taskid) throws IOException {
+      taskDone = true;
+    }
+
+    @Override
+    public void fatalError(TaskAttemptID taskId, String message)
+        throws IOException { }
+
+    @Override
+    public void fsError(TaskAttemptID taskId, String message)
+        throws IOException { }
+
+    @Override
+    public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
+        int fromIndex, int maxLocs, TaskAttemptID id) throws IOException {
+      return null;
+    }
+
+    @Override
+    public JvmTask getTask(JvmContext context) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean ping(TaskAttemptID taskid) throws IOException {
+      return true;
+    }
+
+    @Override
+    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace)
+        throws IOException {
+    }
+
+    @Override
+    public void reportNextRecordRange(TaskAttemptID taskid, Range range)
+        throws IOException {
+    }
+
+    @Override
+    public void shuffleError(TaskAttemptID taskId, String message)
+        throws IOException {
+    }
+
+    @Override
+    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      return true;
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void 
+    updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+                                       long[] sizes) throws IOException {
+      // NOTHING
+    }
+  }
+  
+  /**
+   * A test that mimics a failed task to ensure that it does
+   * not get into the COMMIT_PENDING state, by using a fake
+   * UmbilicalProtocol's implementation that fails if the commit.
+   * protocol is played.
+   * 
+   * The test mocks the various steps in a failed task's 
+   * life-cycle using a special OutputCommitter and UmbilicalProtocol
+   * implementation.
+   * 
+   * @throws Exception
+   */
+  public void testTaskCleanupDoesNotCommit() throws Exception {
+    // Mimic a job with a special committer that does not cleanup
+    // files when a task fails.
+    JobConf job = new JobConf();
+    job.setOutputCommitter(CommitterWithoutCleanup.class);
+    Path outDir = new Path(rootDir, "output"); 
+    FileOutputFormat.setOutputPath(job, outDir);
+
+    // Mimic job setup
+    String dummyAttemptID = "attempt_200707121733_0001_m_000000_0";
+    TaskAttemptID attemptID = TaskAttemptID.forName(dummyAttemptID);
+    OutputCommitter committer = new CommitterWithoutCleanup();
+    JobContext jContext = new JobContext(job, attemptID.getJobID());
+    committer.setupJob(jContext);
+    
+
+    // Mimic a map task
+    dummyAttemptID = "attempt_200707121733_0001_m_000001_0";
+    attemptID = TaskAttemptID.forName(dummyAttemptID);
+    Task task = new MapTask(new Path(rootDir, "job.xml").toString(), attemptID,
+        0, null, 1);
+    task.setConf(job);
+    task.localizeConfiguration(job);
+    task.initialize(job, attemptID.getJobID(), Reporter.NULL, false);
+    
+    // Mimic the map task writing some output.
+    String file = "test.txt";
+    FileSystem localFs = FileSystem.getLocal(job);
+    TextOutputFormat<Text, Text> theOutputFormat 
+      = new TextOutputFormat<Text, Text>();
+    RecordWriter<Text, Text> theRecordWriter = 
+      theOutputFormat.getRecordWriter(localFs,
+        job, file, Reporter.NULL);
+    theRecordWriter.write(new Text("key"), new Text("value"));
+    theRecordWriter.close(Reporter.NULL);
+
+    // Mimic a task failure; setting up the task for cleanup simulates
+    // the abort protocol to be played.
+    // Without checks in the framework, this will fail
+    // as the committer will cause a COMMIT to happen for
+    // the cleanup task.
+    task.setTaskCleanupTask();
+    MyUmbilical umbilical = new MyUmbilical();
+    task.run(job, umbilical);
+    assertTrue("Task did not succeed", umbilical.taskDone);
+  }
+
+  public void testCommitRequiredForMapTask() throws Exception {
+    Task testTask = createDummyTask(TaskType.MAP);
+    assertTrue("MapTask should need commit", testTask.isCommitRequired());
+  }
+
+  public void testCommitRequiredForReduceTask() throws Exception {
+    Task testTask = createDummyTask(TaskType.REDUCE);
+    assertTrue("ReduceTask should need commit", testTask.isCommitRequired());
+  }
+  
+  public void testCommitNotRequiredForJobSetup() throws Exception {
+    Task testTask = createDummyTask(TaskType.MAP);
+    testTask.setJobSetupTask();
+    assertFalse("Job setup task should not need commit", 
+        testTask.isCommitRequired());
+  }
+  
+  public void testCommitNotRequiredForJobCleanup() throws Exception {
+    Task testTask = createDummyTask(TaskType.MAP);
+    testTask.setJobCleanupTask();
+    assertFalse("Job cleanup task should not need commit", 
+        testTask.isCommitRequired());
+  }
+
+  public void testCommitNotRequiredForTaskCleanup() throws Exception {
+    Task testTask = createDummyTask(TaskType.REDUCE);
+    testTask.setTaskCleanupTask();
+    assertFalse("Task cleanup task should not need commit", 
+        testTask.isCommitRequired());
+  }
+
+  private Task createDummyTask(TaskType type) throws IOException, ClassNotFoundException,
+  InterruptedException {
+    JobConf conf = new JobConf();
+    conf.setOutputCommitter(CommitterThatAlwaysRequiresCommit.class);
+    Path outDir = new Path(rootDir, "output"); 
+    FileOutputFormat.setOutputPath(conf, outDir);
+    JobID jobId = JobID.forName("job_201002121132_0001");
+    Task testTask;
+    if (type == TaskType.MAP) {
+      testTask = new MapTask();
+    } else {
+      testTask = new ReduceTask();
+    }
+    testTask.setConf(conf);
+    testTask.initialize(conf, jobId, Reporter.NULL, false);
+    return testTask;
+  }
+
+  public static void main(String[] argv) throws Exception {
+    TestTaskCommit td = new TestTaskCommit();
+    td.testCommitFail();
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskEnvironment.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskEnvironment.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskEnvironment.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskEnvironment.java Fri Mar  4 04:43:33 2011
@@ -26,11 +26,15 @@ import java.util.Vector;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
 import org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
@@ -98,7 +102,14 @@ public class TestTaskEnvironment {
     Task task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
     task.setConf(taskConf);
     TaskInProgress tip = tt.new TaskInProgress(task, taskConf);
-    final TaskRunner taskRunner = task.createRunner(tt, tip);
+    RunningJob rjob = new RunningJob(attemptID.getJobID());
+    TaskController taskController = new DefaultTaskController();
+    taskController.setConf(ttConf);
+    rjob.distCacheMgr = 
+      new TrackerDistributedCacheManager(ttConf, taskController).
+      newTaskDistributedCacheManager(attemptID.getJobID(), taskConf);
+      
+    final TaskRunner taskRunner = task.createRunner(tt, tip, rjob);
     String errorInfo = "Child error";
     String mapredChildEnv = taskRunner.getChildEnv(taskConf);
     taskRunner.updateUserLoginEnv(errorInfo, user, taskConf, env);
@@ -111,9 +122,7 @@ public class TestTaskEnvironment {
     workDir.mkdir();
     final File stdout = new File(TEST_DIR, "stdout");
     final File stderr = new File(TEST_DIR, "stderr");
-    JvmEnv jenv = jvmManager.constructJvmEnv(null, vargs,
-      stdout,stderr, 100, workDir, env, taskConf);
-    Map<String, String> jvmenvmap = jenv.env;
+    Map<String, String> jvmenvmap = env;
     String javaOpts = taskRunner.getChildJavaOpts(ttConf, 
       JobConf.MAPRED_MAP_TASK_JAVA_OPTS);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar  4 04:43:33 2011
@@ -27,18 +27,18 @@ import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
 import junit.framework.TestCase;
+import org.junit.Ignore;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
@@ -54,6 +54,7 @@ import org.apache.hadoop.util.Shell;
  * TaskTracker.
  * 
  */
+@Ignore // test relies on deprecated functionality/lifecycle
 public class TestTaskTrackerLocalization extends TestCase {
 
   private File TEST_ROOT_DIR;
@@ -186,10 +187,9 @@ public class TestTaskTrackerLocalization
     // setup task controller
     taskController = getTaskController();
     taskController.setConf(trackerFConf);
-    taskController.setup();
+    taskController.setup(lDirAlloc);
     tracker.setTaskController(taskController);
-    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
-                                       taskController));
+    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(),localDirs));
   }
 
   protected TaskController getTaskController() {
@@ -617,7 +617,7 @@ public class TestTaskTrackerLocalization
     InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
     tracker.setCleanupThread(cleanupQueue);
 
-    tip.removeTaskFiles(needCleanup, taskId);
+    tip.removeTaskFiles(needCleanup);
 
     if (jvmReuse) {
       // work dir should still exist and cleanup queue should be empty
@@ -708,13 +708,20 @@ public class TestTaskTrackerLocalization
         + " is not created in any of the configured dirs!!",
         attemptWorkDir != null);
 
-    TaskRunner runner = task.createRunner(tracker, tip);
+    RunningJob rjob = new RunningJob(jobId);
+    TaskController taskController = new DefaultTaskController();
+    taskController.setConf(trackerFConf);
+    rjob.distCacheMgr = 
+      new TrackerDistributedCacheManager(trackerFConf, taskController).
+      newTaskDistributedCacheManager(jobId, trackerFConf);
+      
+    TaskRunner runner = task.createRunner(tracker, tip, rjob);
     tip.setTaskRunner(runner);
 
     // /////// Few more methods being tested
     runner.setupChildTaskConfiguration(lDirAlloc);
     TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
-        localizedJobConf);
+        localizedJobConf, true);
     attemptLogFiles = runner.prepareLogFiles(task.getTaskID(),
         task.isTaskCleanupTask());
 
@@ -731,17 +738,6 @@ public class TestTaskTrackerLocalization
     localizedTaskConf = new JobConf(localTaskFile);
     TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
     // ///////
-
-    // Initialize task via TaskController
-    TaskControllerContext taskContext =
-        new TaskController.TaskControllerContext();
-    taskContext.env =
-        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
-            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
-    taskContext.task = task;
-    // /////////// The method being tested
-    taskController.initializeTask(taskContext);
-    // ///////////
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Fri Mar  4 04:43:33 2011
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -64,7 +65,7 @@ public class TestTrackerDistributedCache
     String execPath = path + "/task-controller";
     ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
     taskController.setConf(conf);
-    taskController.setup();
+    taskController.setup(new LocalDirAllocator("mapred.local.dir"));
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java Fri Mar  4 04:43:33 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.UtilsFor
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import static org.junit.Assert.*;
 
@@ -59,10 +60,13 @@ public class TestUserLogCleanup {
   }
 
   private File localizeJob(JobID jobid) throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    new JobLocalizer(tt.getJobConf(), user, 
+                     jobid.toString()).initializeJobLogDir();
     File jobUserlog = TaskLog.getJobDir(jobid);
     JobConf conf = new JobConf();
     // localize job log directory
-    tt.initializeJobLogDir(jobid, conf);
+    tt.saveLogDir(jobid, conf);
     assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
     return jobUserlog;
   }
@@ -78,8 +82,7 @@ public class TestUserLogCleanup {
     tt = new TaskTracker();
     tt.setConf(new JobConf(conf));
     localizer = new Localizer(FileSystem.get(conf), conf
-        .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
-        new DefaultTaskController());
+        .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
     tt.setLocalizer(localizer);
     userLogManager = new UtilsForTests.InLineUserLogManager(conf);
     userLogCleaner = userLogManager.getUserLogCleaner();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar  4 04:43:33 2011
@@ -47,11 +47,13 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogEvent;
 import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 /** 
@@ -439,7 +441,7 @@ public class UtilsForTests {
    * asynchronously.
    */
   public static class InlineCleanupQueue extends CleanupQueue {
-    List<String> stalePaths = new ArrayList<String>();
+    List<Path> stalePaths = new ArrayList<Path>();
 
     public InlineCleanupQueue() {
       // do nothing
@@ -462,6 +464,17 @@ public class UtilsForTests {
         }
       }
     }
+    static boolean deletePath(PathDeletionContext context) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to delete " + context.fullPath);
+      }
+//      FileSystem fs = context.fullPath.getFileSystem(context.conf);
+//      if (fs.exists(context.fullPath)) {
+//        return fs.delete(context.fullPath, true);
+//      }
+      context.deletePath();
+      return true;
+    }
   }
 
   static String getTaskSignalParameter(boolean isMap) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Fri Mar  4 04:43:33 2011
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ProcessTree.Signal;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.mapred.UtilsForTests;
@@ -147,8 +148,7 @@ public class TestProcfsBasedProcessTree 
     String pid = getRogueTaskPID();
     LOG.info("Root process pid: " + pid);
     ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid,
-        ProcessTree.isSetsidAvailable,
-        ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+        ProcessTree.isSetsidAvailable);
     p = p.getProcessTree(); // initialize
     LOG.info("ProcessTree: " + p.toString());
     File leaf = new File(lowestDescendant);
@@ -168,7 +168,11 @@ public class TestProcfsBasedProcessTree 
     String processTreeDump = p.getProcessTreeDump();
 
     // destroy the map task and all its subprocesses
-    p.destroy(true/*in the background*/);
+    if (ProcessTree.isSetsidAvailable) {
+      ProcessTree.killProcessGroup(pid, Signal.KILL);
+    } else {
+      ProcessTree.killProcess(pid, Signal.KILL);
+    }
     if(ProcessTree.isSetsidAvailable) {// whole processtree should be gone
       assertEquals(false, p.isAnyProcessInTreeAlive());
     }