You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:43:35 UTC
svn commit: r1077679 [6/6] - in
/hadoop/common/branches/branch-0.20-security-patches: ./
src/c++/task-controller/ src/c++/task-controller/impl/
src/c++/task-controller/test/ src/c++/task-controller/tests/
src/core/org/apache/hadoop/fs/ src/core/org/apa...
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1077679&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskCommit.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskCommit.java Fri Mar 4 04:43:33 2011
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class TestTaskCommit extends HadoopTestCase {
+ Path rootDir =
+ new Path(System.getProperty("test.build.data", "/tmp"), "test");
+
+ static class CommitterWithCommitFail extends FileOutputCommitter {
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ Path taskOutputPath = getTempTaskOutputPath(context);
+ TaskAttemptID attemptId = context.getTaskAttemptID();
+ JobConf job = context.getJobConf();
+ if (taskOutputPath != null) {
+ FileSystem fs = taskOutputPath.getFileSystem(job);
+ if (fs.exists(taskOutputPath)) {
+ throw new IOException();
+ }
+ }
+ }
+ }
+
+ /**
+ * Special Committer that does not cleanup temporary files in
+ * abortTask
+ *
+ * The framework's FileOutputCommitter cleans up any temporary
+ * files left behind in abortTask. We want the test case to
+ * find these files and hence short-circuit abortTask.
+ */
+ static class CommitterWithoutCleanup extends FileOutputCommitter {
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ // does nothing
+ }
+ }
+
+ /**
+ * Special committer that always requires commit.
+ */
+ static class CommitterThatAlwaysRequiresCommit extends FileOutputCommitter {
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context)
+ throws IOException {
+ return true;
+ }
+ }
+
+ public TestTaskCommit() throws IOException {
+ super(LOCAL_MR, LOCAL_FS, 1, 1);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ FileUtil.fullyDelete(new File(rootDir.toString()));
+ }
+
+ public void testCommitFail() throws IOException {
+ Path rootDir =
+ new Path(System.getProperty("test.build.data", "/tmp"), "test");
+ final Path inDir = new Path(rootDir, "input");
+ final Path outDir = new Path(rootDir, "output");
+ JobConf jobConf = createJobConf();
+ jobConf.setMaxMapAttempts(1);
+ jobConf.setOutputCommitter(CommitterWithCommitFail.class);
+ RunningJob rJob = UtilsForTests.runJob(jobConf, inDir, outDir, 1, 0);
+ rJob.waitForCompletion();
+ assertEquals(JobStatus.FAILED, rJob.getJobState());
+ }
+
+ private class MyUmbilical implements TaskUmbilicalProtocol {
+ boolean taskDone = false;
+
+ @Override
+ public boolean canCommit(TaskAttemptID taskid) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ fail("Task should not go to commit-pending");
+ }
+
+ @Override
+ public void done(TaskAttemptID taskid) throws IOException {
+ taskDone = true;
+ }
+
+ @Override
+ public void fatalError(TaskAttemptID taskId, String message)
+ throws IOException { }
+
+ @Override
+ public void fsError(TaskAttemptID taskId, String message)
+ throws IOException { }
+
+ @Override
+ public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
+ int fromIndex, int maxLocs, TaskAttemptID id) throws IOException {
+ return null;
+ }
+
+ @Override
+ public JvmTask getTask(JvmContext context) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean ping(TaskAttemptID taskid) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void reportDiagnosticInfo(TaskAttemptID taskid, String trace)
+ throws IOException {
+ }
+
+ @Override
+ public void reportNextRecordRange(TaskAttemptID taskid, Range range)
+ throws IOException {
+ }
+
+ @Override
+ public void shuffleError(TaskAttemptID taskId, String message)
+ throws IOException {
+ }
+
+ @Override
+ public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ return true;
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void
+ updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+ long[] sizes) throws IOException {
+ // NOTHING
+ }
+ }
+
+ /**
+ * A test that mimics a failed task to ensure that it does
+ * not get into the COMMIT_PENDING state, by using a fake
+ * UmbilicalProtocol's implementation that fails if the commit.
+ * protocol is played.
+ *
+ * The test mocks the various steps in a failed task's
+ * life-cycle using a special OutputCommitter and UmbilicalProtocol
+ * implementation.
+ *
+ * @throws Exception
+ */
+ public void testTaskCleanupDoesNotCommit() throws Exception {
+ // Mimic a job with a special committer that does not cleanup
+ // files when a task fails.
+ JobConf job = new JobConf();
+ job.setOutputCommitter(CommitterWithoutCleanup.class);
+ Path outDir = new Path(rootDir, "output");
+ FileOutputFormat.setOutputPath(job, outDir);
+
+ // Mimic job setup
+ String dummyAttemptID = "attempt_200707121733_0001_m_000000_0";
+ TaskAttemptID attemptID = TaskAttemptID.forName(dummyAttemptID);
+ OutputCommitter committer = new CommitterWithoutCleanup();
+ JobContext jContext = new JobContext(job, attemptID.getJobID());
+ committer.setupJob(jContext);
+
+
+ // Mimic a map task
+ dummyAttemptID = "attempt_200707121733_0001_m_000001_0";
+ attemptID = TaskAttemptID.forName(dummyAttemptID);
+ Task task = new MapTask(new Path(rootDir, "job.xml").toString(), attemptID,
+ 0, null, 1);
+ task.setConf(job);
+ task.localizeConfiguration(job);
+ task.initialize(job, attemptID.getJobID(), Reporter.NULL, false);
+
+ // Mimic the map task writing some output.
+ String file = "test.txt";
+ FileSystem localFs = FileSystem.getLocal(job);
+ TextOutputFormat<Text, Text> theOutputFormat
+ = new TextOutputFormat<Text, Text>();
+ RecordWriter<Text, Text> theRecordWriter =
+ theOutputFormat.getRecordWriter(localFs,
+ job, file, Reporter.NULL);
+ theRecordWriter.write(new Text("key"), new Text("value"));
+ theRecordWriter.close(Reporter.NULL);
+
+ // Mimic a task failure; setting up the task for cleanup simulates
+ // the abort protocol to be played.
+ // Without checks in the framework, this will fail
+ // as the committer will cause a COMMIT to happen for
+ // the cleanup task.
+ task.setTaskCleanupTask();
+ MyUmbilical umbilical = new MyUmbilical();
+ task.run(job, umbilical);
+ assertTrue("Task did not succeed", umbilical.taskDone);
+ }
+
+ public void testCommitRequiredForMapTask() throws Exception {
+ Task testTask = createDummyTask(TaskType.MAP);
+ assertTrue("MapTask should need commit", testTask.isCommitRequired());
+ }
+
+ public void testCommitRequiredForReduceTask() throws Exception {
+ Task testTask = createDummyTask(TaskType.REDUCE);
+ assertTrue("ReduceTask should need commit", testTask.isCommitRequired());
+ }
+
+ public void testCommitNotRequiredForJobSetup() throws Exception {
+ Task testTask = createDummyTask(TaskType.MAP);
+ testTask.setJobSetupTask();
+ assertFalse("Job setup task should not need commit",
+ testTask.isCommitRequired());
+ }
+
+ public void testCommitNotRequiredForJobCleanup() throws Exception {
+ Task testTask = createDummyTask(TaskType.MAP);
+ testTask.setJobCleanupTask();
+ assertFalse("Job cleanup task should not need commit",
+ testTask.isCommitRequired());
+ }
+
+ public void testCommitNotRequiredForTaskCleanup() throws Exception {
+ Task testTask = createDummyTask(TaskType.REDUCE);
+ testTask.setTaskCleanupTask();
+ assertFalse("Task cleanup task should not need commit",
+ testTask.isCommitRequired());
+ }
+
+ private Task createDummyTask(TaskType type) throws IOException, ClassNotFoundException,
+ InterruptedException {
+ JobConf conf = new JobConf();
+ conf.setOutputCommitter(CommitterThatAlwaysRequiresCommit.class);
+ Path outDir = new Path(rootDir, "output");
+ FileOutputFormat.setOutputPath(conf, outDir);
+ JobID jobId = JobID.forName("job_201002121132_0001");
+ Task testTask;
+ if (type == TaskType.MAP) {
+ testTask = new MapTask();
+ } else {
+ testTask = new ReduceTask();
+ }
+ testTask.setConf(conf);
+ testTask.initialize(conf, jobId, Reporter.NULL, false);
+ return testTask;
+ }
+
+ public static void main(String[] argv) throws Exception {
+ TestTaskCommit td = new TestTaskCommit();
+ td.testCommitFail();
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskEnvironment.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskEnvironment.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskEnvironment.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskEnvironment.java Fri Mar 4 04:43:33 2011
@@ -26,11 +26,15 @@ import java.util.Vector;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
import org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
@@ -98,7 +102,14 @@ public class TestTaskEnvironment {
Task task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
task.setConf(taskConf);
TaskInProgress tip = tt.new TaskInProgress(task, taskConf);
- final TaskRunner taskRunner = task.createRunner(tt, tip);
+ RunningJob rjob = new RunningJob(attemptID.getJobID());
+ TaskController taskController = new DefaultTaskController();
+ taskController.setConf(ttConf);
+ rjob.distCacheMgr =
+ new TrackerDistributedCacheManager(ttConf, taskController).
+ newTaskDistributedCacheManager(attemptID.getJobID(), taskConf);
+
+ final TaskRunner taskRunner = task.createRunner(tt, tip, rjob);
String errorInfo = "Child error";
String mapredChildEnv = taskRunner.getChildEnv(taskConf);
taskRunner.updateUserLoginEnv(errorInfo, user, taskConf, env);
@@ -111,9 +122,7 @@ public class TestTaskEnvironment {
workDir.mkdir();
final File stdout = new File(TEST_DIR, "stdout");
final File stderr = new File(TEST_DIR, "stderr");
- JvmEnv jenv = jvmManager.constructJvmEnv(null, vargs,
- stdout,stderr, 100, workDir, env, taskConf);
- Map<String, String> jvmenvmap = jenv.env;
+ Map<String, String> jvmenvmap = env;
String javaOpts = taskRunner.getChildJavaOpts(ttConf,
JobConf.MAPRED_MAP_TASK_JAVA_OPTS);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 04:43:33 2011
@@ -27,18 +27,18 @@ import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import junit.framework.TestCase;
+import org.junit.Ignore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.mapred.QueueManager.QueueACL;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.RunningJob;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
@@ -54,6 +54,7 @@ import org.apache.hadoop.util.Shell;
* TaskTracker.
*
*/
+@Ignore // test relies on deprecated functionality/lifecycle
public class TestTaskTrackerLocalization extends TestCase {
private File TEST_ROOT_DIR;
@@ -186,10 +187,9 @@ public class TestTaskTrackerLocalization
// setup task controller
taskController = getTaskController();
taskController.setConf(trackerFConf);
- taskController.setup();
+ taskController.setup(lDirAlloc);
tracker.setTaskController(taskController);
- tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
- taskController));
+ tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(),localDirs));
}
protected TaskController getTaskController() {
@@ -617,7 +617,7 @@ public class TestTaskTrackerLocalization
InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
tracker.setCleanupThread(cleanupQueue);
- tip.removeTaskFiles(needCleanup, taskId);
+ tip.removeTaskFiles(needCleanup);
if (jvmReuse) {
// work dir should still exist and cleanup queue should be empty
@@ -708,13 +708,20 @@ public class TestTaskTrackerLocalization
+ " is not created in any of the configured dirs!!",
attemptWorkDir != null);
- TaskRunner runner = task.createRunner(tracker, tip);
+ RunningJob rjob = new RunningJob(jobId);
+ TaskController taskController = new DefaultTaskController();
+ taskController.setConf(trackerFConf);
+ rjob.distCacheMgr =
+ new TrackerDistributedCacheManager(trackerFConf, taskController).
+ newTaskDistributedCacheManager(jobId, trackerFConf);
+
+ TaskRunner runner = task.createRunner(tracker, tip, rjob);
tip.setTaskRunner(runner);
// /////// Few more methods being tested
runner.setupChildTaskConfiguration(lDirAlloc);
TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
- localizedJobConf);
+ localizedJobConf, true);
attemptLogFiles = runner.prepareLogFiles(task.getTaskID(),
task.isTaskCleanupTask());
@@ -731,17 +738,6 @@ public class TestTaskTrackerLocalization
localizedTaskConf = new JobConf(localTaskFile);
TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
// ///////
-
- // Initialize task via TaskController
- TaskControllerContext taskContext =
- new TaskController.TaskControllerContext();
- taskContext.env =
- new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
- .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
- taskContext.task = task;
- // /////////// The method being tested
- taskController.initializeTask(taskContext);
- // ///////////
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Fri Mar 4 04:43:33 2011
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
import org.apache.hadoop.security.UserGroupInformation;
@@ -64,7 +65,7 @@ public class TestTrackerDistributedCache
String execPath = path + "/task-controller";
((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
taskController.setConf(conf);
- taskController.setup();
+ taskController.setup(new LocalDirAllocator("mapred.local.dir"));
}
@Override
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java Fri Mar 4 04:43:33 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.UtilsFor
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+import org.apache.hadoop.security.UserGroupInformation;
import static org.junit.Assert.*;
@@ -59,10 +60,13 @@ public class TestUserLogCleanup {
}
private File localizeJob(JobID jobid) throws IOException {
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ new JobLocalizer(tt.getJobConf(), user,
+ jobid.toString()).initializeJobLogDir();
File jobUserlog = TaskLog.getJobDir(jobid);
JobConf conf = new JobConf();
// localize job log directory
- tt.initializeJobLogDir(jobid, conf);
+ tt.saveLogDir(jobid, conf);
assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
return jobUserlog;
}
@@ -78,8 +82,7 @@ public class TestUserLogCleanup {
tt = new TaskTracker();
tt.setConf(new JobConf(conf));
localizer = new Localizer(FileSystem.get(conf), conf
- .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
- new DefaultTaskController());
+ .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
tt.setLocalizer(localizer);
userLogManager = new UtilsForTests.InLineUserLogManager(conf);
userLogCleaner = userLogManager.getUserLogCleaner();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar 4 04:43:33 2011
@@ -47,11 +47,13 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogEvent;
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
/**
@@ -439,7 +441,7 @@ public class UtilsForTests {
* asynchronously.
*/
public static class InlineCleanupQueue extends CleanupQueue {
- List<String> stalePaths = new ArrayList<String>();
+ List<Path> stalePaths = new ArrayList<Path>();
public InlineCleanupQueue() {
// do nothing
@@ -462,6 +464,17 @@ public class UtilsForTests {
}
}
}
+ static boolean deletePath(PathDeletionContext context) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to delete " + context.fullPath);
+ }
+// FileSystem fs = context.fullPath.getFileSystem(context.conf);
+// if (fs.exists(context.fullPath)) {
+// return fs.delete(context.fullPath, true);
+// }
+ context.deletePath();
+ return true;
+ }
}
static String getTaskSignalParameter(boolean isMap) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Fri Mar 4 04:43:33 2011
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ProcessTree.Signal;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.mapred.UtilsForTests;
@@ -147,8 +148,7 @@ public class TestProcfsBasedProcessTree
String pid = getRogueTaskPID();
LOG.info("Root process pid: " + pid);
ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid,
- ProcessTree.isSetsidAvailable,
- ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ ProcessTree.isSetsidAvailable);
p = p.getProcessTree(); // initialize
LOG.info("ProcessTree: " + p.toString());
File leaf = new File(lowestDescendant);
@@ -168,7 +168,11 @@ public class TestProcfsBasedProcessTree
String processTreeDump = p.getProcessTreeDump();
// destroy the map task and all its subprocesses
- p.destroy(true/*in the background*/);
+ if (ProcessTree.isSetsidAvailable) {
+ ProcessTree.killProcessGroup(pid, Signal.KILL);
+ } else {
+ ProcessTree.killProcess(pid, Signal.KILL);
+ }
if(ProcessTree.isSetsidAvailable) {// whole processtree should be gone
assertEquals(false, p.isAnyProcessInTreeAlive());
}