You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:01:09 UTC
svn commit: r1077299 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/streaming/src/test/org/apache/hadoop/streaming/
mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/filecache/
test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 04:01:09 2011
New Revision: 1077299
URL: http://svn.apache.org/viewvc?rev=1077299&view=rev
Log:
commit 0fd623198193ea24fa84f756427012f970b71fc4
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Mon Mar 8 22:58:43 2010 +0530
MAPREDUCE:1435 from https://issues.apache.org/jira/secure/attachment/12438172/MR-1435-y20s-1.txt
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1435. Add test cases to already committed patch for this
+ jira, synchronizing changes with trunk. (yhemanth)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java Fri Mar 4 04:01:09 2011
@@ -19,13 +19,20 @@
package org.apache.hadoop.streaming;
import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.ToolRunner;
/**
* Test Streaming with LinuxTaskController running the jobs as a user different
@@ -77,4 +84,95 @@ public class TestStreamingAsDifferentUse
}
});
}
+
+ /**
+ * Verify if the permissions of distcache dir contents are valid once the job
+ * is finished
+ */
+ public void testStreamingWithDistCache()
+ throws Exception {
+ if (!shouldRun()) {
+ return;
+ }
+ startCluster();
+ final String[] localDirs = mrCluster.getTaskTrackerLocalDirs(0);
+ final JobConf myConf = getClusterConf();
+
+ // create file that will go into public distributed cache
+ File publicFile = new File(System.getProperty(
+ "test.build.data", "/tmp"), "publicFile");
+ FileOutputStream fstream = new FileOutputStream(publicFile);
+ fstream.write("public file contents".getBytes());
+ fstream.close();
+
+ // put the file(that should go into public dist cache) in dfs and set
+ // read and exe permissions for others
+ FileSystem dfs = dfsCluster.getFileSystem();
+ final String publicCacheFile = dfs.getDefaultUri(myConf).toString()
+ + "/tmp/publicFile";
+ dfs.copyFromLocalFile(new Path(publicFile.getAbsolutePath()),
+ new Path(publicCacheFile));
+ dfs.setPermission(new Path(dfs.getDefaultUri(myConf).toString() + "/tmp"),
+ new FsPermission((short)0755));
+ dfs.setPermission(new Path(publicCacheFile), new FsPermission((short)0755));
+ final String taskTrackerUser
+ = UserGroupInformation.getCurrentUser().getShortUserName();
+
+ taskControllerUser.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception{
+
+ FileSystem inFs = inputPath.getFileSystem(myConf);
+ FileSystem outFs = outputPath.getFileSystem(myConf);
+ outFs.delete(outputPath, true);
+ if (!inFs.mkdirs(inputPath)) {
+ throw new IOException("Mkdirs failed to create " + inFs.toString());
+ }
+
+ // create input file
+ DataOutputStream file = inFs.create(new Path(inputPath, "part-0"));
+ file.writeBytes(input);
+ file.close();
+
+ // Create file that will be passed using -files option.
+ // This is private dist cache file
+ File privateFile = new File(System.getProperty(
+ "test.build.data", "/tmp"), "test.sh");
+ privateFile.createNewFile();
+
+ String[] args =
+ new String[] {
+ "-files", privateFile.toString() + "," + publicCacheFile,
+ "-Dmapreduce.task.files.preserve.failedtasks=true",
+ "-Dstream.tmpdir=" + System.getProperty("test.build.data", "/tmp"),
+ "-input", inputPath.makeQualified(inFs).toString(),
+ "-output", outputPath.makeQualified(outFs).toString(),
+ "-mapper", "pwd",
+ "-reducer", StreamJob.REDUCE_NONE
+ };
+ StreamJob streamJob = new StreamJob();
+ streamJob.setConf(myConf);
+
+ assertTrue("Job failed", ToolRunner.run(streamJob, args)==0);
+
+ // validate private cache files' permissions
+ checkPermissionsOnPrivateDistCache(localDirs,
+ taskControllerUser.getShortUserName(), taskTrackerSpecialGroup);
+
+ // check the file is present even after the job is over.
+ // work directory symlink cleanup should not have removed the target
+ // files.
+ checkPresenceOfPrivateDistCacheFiles(localDirs,
+ taskControllerUser.getShortUserName(), new String[] {"test.sh"});
+
+ // validate private cache files' permissions
+ checkPermissionsOnPublicDistCache(FileSystem.getLocal(myConf),
+ localDirs, taskTrackerUser, taskTrackerPrimaryGroup);
+
+ checkPresenceOfPublicDistCacheFiles(localDirs,
+ new String[] {"publicFile"});
+ assertOwnerShip(outputPath);
+ return null;
+ }
+ });
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar 4 04:01:09 2011
@@ -144,7 +144,7 @@ public class DefaultTaskController exten
void enableTaskForCleanup(PathDeletionContext context)
throws IOException {
try {
- FileUtil.chmod(context.fullPath, "ug+rwx", true);
+ FileUtil.chmod(context.fullPath, "u+rwx", true);
} catch(InterruptedException e) {
LOG.warn("Interrupted while setting permissions for " + context.fullPath +
" for deletion.");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar 4 04:01:09 2011
@@ -373,16 +373,53 @@ public class TestTrackerDistributedCache
*/
private void checkPublicFilePermissions(Path[] localCacheFiles)
throws IOException {
+ checkPublicFilePermissions(fs, localCacheFiles);
+ }
+
+ /**
+ * Verify the permissions for a file localized as a public distributed
+ * cache file
+ * @param fs The Local FileSystem used to get the permissions
+ * @param localCacheFiles The list of files whose permissions should be
+ * verified.
+ * @throws IOException
+ */
+ public static void checkPublicFilePermissions(FileSystem fs,
+ Path[] localCacheFiles) throws IOException {
// All the files should have read and executable permissions for others
for (Path p : localCacheFiles) {
FsPermission perm = fs.getFileStatus(p).getPermission();
- assertTrue("cache file is not readable by others", perm.getOtherAction()
- .implies(FsAction.READ));
- assertTrue("cache file is not executable by others", perm
- .getOtherAction().implies(FsAction.EXECUTE));
+ assertTrue("cache file is not readable / executable by owner: perm="
+ + perm.getUserAction(), perm.getUserAction()
+ .implies(FsAction.READ_EXECUTE));
+ assertTrue("cache file is not readable / executable by group: perm="
+ + perm.getGroupAction(), perm.getGroupAction()
+ .implies(FsAction.READ_EXECUTE));
+ assertTrue("cache file is not readable / executable by others: perm="
+ + perm.getOtherAction(), perm.getOtherAction()
+ .implies(FsAction.READ_EXECUTE));
}
}
-
+
+ /**
+ * Verify the ownership for files localized as a public distributed cache
+ * file.
+ * @param fs The Local FileSystem used to get the ownership
+ * @param localCacheFiles THe list of files whose ownership should be
+ * verified
+ * @param owner The owner of the files
+ * @param group The group owner of the files.
+ * @throws IOException
+ */
+ public static void checkPublicFileOwnership(FileSystem fs,
+ Path[] localCacheFiles, String owner, String group)
+ throws IOException {
+ for (Path p: localCacheFiles) {
+ assertEquals(owner, fs.getFileStatus(p).getOwner());
+ assertEquals(group, fs.getFileStatus(p).getGroup());
+ }
+ }
+
protected String getJobOwnerName() throws IOException {
return UserGroupInformation.getLoginUser().getUserName();
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Mar 4 04:01:09 2011
@@ -22,13 +22,17 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.List;
+import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
@@ -112,6 +116,11 @@ public class ClusterWithLinuxTaskControl
protected UserGroupInformation taskControllerUser;
protected static String taskTrackerSpecialGroup = null;
+ /**
+ * Primary group of the tasktracker - i.e. the user running the
+ * test.
+ */
+ protected static String taskTrackerPrimaryGroup = null;
static {
if (isTaskExecPathPassed()) {
try {
@@ -123,6 +132,13 @@ public class ClusterWithLinuxTaskControl
LOG.warn("Could not get group of the binary", e);
fail("Could not get group of the binary");
}
+ try {
+ taskTrackerPrimaryGroup =
+ UserGroupInformation.getCurrentUser().getGroupNames()[0];
+ } catch (IOException ioe) {
+ LOG.warn("Could not get primary group of the current user", ioe);
+ fail("Could not get primary group of the current user");
+ }
}
}
@@ -300,4 +316,160 @@ public class ClusterWithLinuxTaskControl
.equals(taskControllerUser.getGroupNames()[0]));
}
}
+
+ /**
+ * Validates permissions of private distcache dir and its contents fully
+ */
+ public static void checkPermissionsOnPrivateDistCache(String[] localDirs,
+ String user, String groupOwner) throws IOException {
+ for (String localDir : localDirs) {
+ File distCacheDir = new File(localDir,
+ TaskTracker.getPrivateDistributedCacheDir(user));
+ if (distCacheDir.exists()) {
+ checkPermissionsOnDir(distCacheDir, user, groupOwner, "dr-xrws---",
+ "-r-xrwx---");
+ }
+ }
+ }
+
+ /**
+ * Check that files expected to be localized in distributed cache for a user
+ * are present.
+ * @param localDirs List of mapred local directories.
+ * @param user User against which localization is happening
+ * @param expectedFileNames List of files expected to be localized
+ * @throws IOException
+ */
+ public static void checkPresenceOfPrivateDistCacheFiles(String[] localDirs,
+ String user, String[] expectedFileNames) throws IOException {
+ FileGatherer gatherer = new FileGatherer();
+ for (String localDir : localDirs) {
+ File distCacheDir = new File(localDir,
+ TaskTracker.getPrivateDistributedCacheDir(user));
+ findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+ }
+ assertEquals("Files expected in private distributed cache were not found",
+ expectedFileNames.length, gatherer.getCount());
+ }
+
+ /**
+ * Validates permissions and ownership of public distcache dir and its
+ * contents fully in all local dirs
+ */
+ public static void checkPermissionsOnPublicDistCache(FileSystem localFS,
+ String[] localDirs, String owner, String group) throws IOException {
+ for (String localDir : localDirs) {
+ File distCacheDir = new File(localDir,
+ TaskTracker.getPublicDistributedCacheDir());
+
+ if (distCacheDir.exists()) {
+ checkPublicFilePermissions(localFS, distCacheDir, owner, group);
+ }
+ }
+ }
+
+ /**
+ * Checks that files expected to be localized in the public distributed
+ * cache are present
+ * @param localDirs List of mapred local directories
+ * @param expectedFileNames List of expected file names.
+ * @throws IOException
+ */
+ public static void checkPresenceOfPublicDistCacheFiles(String[] localDirs,
+ String[] expectedFileNames) throws IOException {
+ FileGatherer gatherer = new FileGatherer();
+ for (String localDir : localDirs) {
+ File distCacheDir = new File(localDir,
+ TaskTracker.getPublicDistributedCacheDir());
+ findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+ }
+ assertEquals("Files expected in public distributed cache were not found",
+ expectedFileNames.length, gatherer.getCount());
+ }
+
+ /**
+ * Validates permissions and ownership on the public distributed cache files
+ */
+ private static void checkPublicFilePermissions(FileSystem localFS, File dir,
+ String owner, String group)
+ throws IOException {
+ Path dirPath = new Path(dir.getAbsolutePath());
+ TestTrackerDistributedCacheManager.checkPublicFilePermissions(localFS,
+ new Path[] {dirPath});
+ TestTrackerDistributedCacheManager.checkPublicFileOwnership(localFS,
+ new Path[] {dirPath}, owner, group);
+ if (dir.isDirectory()) {
+ File[] files = dir.listFiles();
+ for (File file : files) {
+ checkPublicFilePermissions(localFS, file, owner, group);
+ }
+ }
+ }
+
+ /**
+ * Validates permissions of given dir and its contents fully(i.e. recursively)
+ */
+ private static void checkPermissionsOnDir(File dir, String user,
+ String groupOwner, String expectedDirPermissions,
+ String expectedFilePermissions) throws IOException {
+ TestTaskTrackerLocalization.checkFilePermissions(dir.toString(),
+ expectedDirPermissions, user, groupOwner);
+ File[] files = dir.listFiles();
+ for (File file : files) {
+ if (file.isDirectory()) {
+ checkPermissionsOnDir(file, user, groupOwner, expectedDirPermissions,
+ expectedFilePermissions);
+ } else {
+ TestTaskTrackerLocalization.checkFilePermissions(file.toString(),
+ expectedFilePermissions, user, groupOwner);
+ }
+ }
+ }
+
+ // Check which files among those expected are present in the rootDir
+ // Add those present to the FileGatherer.
+ private static void findExpectedFiles(String[] expectedFileNames,
+ File rootDir, FileGatherer gatherer) {
+
+ File[] files = rootDir.listFiles();
+ if (files == null) {
+ return;
+ }
+ for (File file : files) {
+ if (file.isDirectory()) {
+ findExpectedFiles(expectedFileNames, file, gatherer);
+ } else {
+ if (isFilePresent(expectedFileNames, file)) {
+ gatherer.addFileName(file.getName());
+ }
+ }
+ }
+
+ }
+
+ // Test if the passed file is present in the expected list of files.
+ private static boolean isFilePresent(String[] expectedFileNames, File file) {
+ boolean foundFileName = false;
+ for (String name : expectedFileNames) {
+ if (name.equals(file.getName())) {
+ foundFileName = true;
+ break;
+ }
+ }
+ return foundFileName;
+ }
+
+ // Helper class to collect a list of file names across multiple
+ // method calls. Wrapper around a collection defined for clarity
+ private static class FileGatherer {
+ List<String> foundFileNames = new ArrayList<String>();
+
+ void addFileName(String fileName) {
+ foundFileNames.add(fileName);
+ }
+
+ int getCount() {
+ return foundFileNames.size();
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar 4 04:01:09 2011
@@ -264,6 +264,15 @@ public class MiniMRCluster {
return (taskTrackerList.get(taskTracker)).getLocalDir();
}
+ /**
+ * Get all the local directories for the Nth task tracker
+ * @param taskTracker the index of the task tracker to check
+ * @return array of local dirs
+ */
+ public String[] getTaskTrackerLocalDirs(int taskTracker) {
+ return (taskTrackerList.get(taskTracker)).getLocalDirs();
+ }
+
public JobTrackerRunner getJobTrackerRunner() {
return jobTracker;
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java?rev=1077299&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java Fri Mar 4 04:01:09 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Verifies if TaskRunner.SetupWorkDir() is cleaning up files/dirs pointed
+ * to by symlinks under work dir.
+ */
+public class TestSetupWorkDir extends TestCase {
+
+ /**
+ * Creates 1 subdirectory and 1 file under dir2. Creates 1 subdir, 1 file,
+ * 1 symlink to a dir and a symlink to a file under dir1.
+ * Creates dir1/subDir, dir1/file, dir2/subDir, dir2/file,
+ * dir1/symlinkSubDir->dir2/subDir, dir1/symlinkFile->dir2/file.
+ */
+ static void createSubDirsAndSymLinks(JobConf jobConf, Path dir1, Path dir2)
+ throws IOException {
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ createSubDirAndFile(fs, dir1);
+ createSubDirAndFile(fs, dir2);
+ // now create symlinks under dir1 that point to file/dir under dir2
+ FileUtil.symLink(dir2+"/subDir", dir1+"/symlinkSubDir");
+ FileUtil.symLink(dir2+"/file", dir1+"/symlinkFile");
+ }
+
+ static void createSubDirAndFile(FileSystem fs, Path dir) throws IOException {
+ Path subDir = new Path(dir, "subDir");
+ fs.mkdirs(subDir);
+ Path p = new Path(dir, "file");
+ DataOutputStream out = fs.create(p);
+ out.writeBytes("dummy input");
+ out.close();
+ }
+
+ void createEmptyDir(FileSystem fs, Path dir) throws IOException {
+ if (fs.exists(dir)) {
+ fs.delete(dir, true);
+ }
+ if (!fs.mkdirs(dir)) {
+ throw new IOException("Unable to create directory " + dir);
+ }
+ }
+
+ /**
+ * Validates if TaskRunner.setupWorkDir() is properly cleaning up the
+ * contents of workDir and creating tmp dir under it (even though workDir
+ * contains symlinks to files/directories).
+ */
+ public void testSetupWorkDir() throws IOException {
+ Path rootDir = new Path(System.getProperty("test.build.data", "/tmp"),
+ "testSetupWorkDir");
+ Path myWorkDir = new Path(rootDir, "./work");
+ Path myTargetDir = new Path(rootDir, "./tmp");
+ JobConf jConf = new JobConf();
+ FileSystem fs = FileSystem.getLocal(jConf);
+ createEmptyDir(fs, myWorkDir);
+ createEmptyDir(fs, myTargetDir);
+
+ // create subDirs and symlinks under work dir
+ createSubDirsAndSymLinks(jConf, myWorkDir, myTargetDir);
+
+ assertTrue("Did not create symlinks/files/dirs properly. Check "
+ + myWorkDir + " and " + myTargetDir,
+ (fs.listStatus(myWorkDir).length == 4) &&
+ (fs.listStatus(myTargetDir).length == 2));
+
+ // let us disable creation of symlinks in setupWorkDir()
+ jConf.set("mapred.create.symlink", "no");
+
+ // Deletion of myWorkDir should not affect contents of myTargetDir.
+ // myTargetDir is like $user/jobcache/distcache
+ TaskRunner.setupWorkDir(jConf, new File(myWorkDir.toUri().getPath()));
+
+ // Contents of myWorkDir should be cleaned up and a tmp dir should be
+ // created under myWorkDir
+ assertTrue(myWorkDir + " is not cleaned up properly.",
+ fs.exists(myWorkDir) && (fs.listStatus(myWorkDir).length == 1));
+
+ // Make sure that the dir under myWorkDir is tmp
+ assertTrue(fs.listStatus(myWorkDir)[0].getPath().toUri().getPath()
+ .toString().equals(myWorkDir.toString() + "/tmp"));
+
+ // Make sure that myTargetDir is not changed/deleted
+ assertTrue("Dir " + myTargetDir + " seem to be modified.",
+ fs.exists(myTargetDir) && (fs.listStatus(myTargetDir).length == 2));
+
+ // cleanup
+ fs.delete(rootDir, true);
+ }
+}