You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:01:09 UTC

svn commit: r1077299 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/streaming/src/test/org/apache/hadoop/streaming/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/filecache/ test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 04:01:09 2011
New Revision: 1077299

URL: http://svn.apache.org/viewvc?rev=1077299&view=rev
Log:
commit 0fd623198193ea24fa84f756427012f970b71fc4
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date:   Mon Mar 8 22:58:43 2010 +0530

    MAPREDUCE:1435 from https://issues.apache.org/jira/secure/attachment/12438172/MR-1435-y20s-1.txt
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1435. Add test cases to already committed patch for this
    +    jira, synchronizing changes with trunk. (yhemanth)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java Fri Mar  4 04:01:09 2011
@@ -19,13 +19,20 @@
 package org.apache.hadoop.streaming;
 
 import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Test Streaming with LinuxTaskController running the jobs as a user different
@@ -77,4 +84,95 @@ public class TestStreamingAsDifferentUse
       }
     });
   }
+  
+  /**
+   * Verify if the permissions of distcache dir contents are valid once the job
+   * is finished
+   */
+  public void testStreamingWithDistCache()
+  throws Exception {
+    if (!shouldRun()) {
+      return;
+    }
+    startCluster();
+    final String[] localDirs = mrCluster.getTaskTrackerLocalDirs(0);
+    final JobConf myConf = getClusterConf();
+
+    // create file that will go into public distributed cache
+    File publicFile = new File(System.getProperty(
+        "test.build.data", "/tmp"), "publicFile");
+    FileOutputStream fstream = new FileOutputStream(publicFile);
+    fstream.write("public file contents".getBytes());
+    fstream.close();
+
+    // put the file(that should go into public dist cache) in dfs and set
+    // read and exe permissions for others
+    FileSystem dfs = dfsCluster.getFileSystem();
+    final String publicCacheFile = dfs.getDefaultUri(myConf).toString()
+                             + "/tmp/publicFile";
+    dfs.copyFromLocalFile(new Path(publicFile.getAbsolutePath()),
+        new Path(publicCacheFile));
+    dfs.setPermission(new Path(dfs.getDefaultUri(myConf).toString() + "/tmp"),
+        new FsPermission((short)0755));
+    dfs.setPermission(new Path(publicCacheFile), new FsPermission((short)0755));
+    final String taskTrackerUser 
+      = UserGroupInformation.getCurrentUser().getShortUserName();
+    
+    taskControllerUser.doAs(new PrivilegedExceptionAction<Void>() {
+      public Void run() throws Exception{
+
+        FileSystem inFs = inputPath.getFileSystem(myConf);
+        FileSystem outFs = outputPath.getFileSystem(myConf);
+        outFs.delete(outputPath, true);
+        if (!inFs.mkdirs(inputPath)) {
+          throw new IOException("Mkdirs failed to create " + inFs.toString());
+        }
+
+        // create input file
+        DataOutputStream file = inFs.create(new Path(inputPath, "part-0"));
+        file.writeBytes(input);
+        file.close();
+
+        // Create file that will be passed using -files option.
+        // This is private dist cache file
+        File privateFile = new File(System.getProperty(
+            "test.build.data", "/tmp"), "test.sh");
+        privateFile.createNewFile();
+
+        String[] args =
+          new String[] {
+            "-files", privateFile.toString() + "," + publicCacheFile,
+            "-Dmapreduce.task.files.preserve.failedtasks=true",
+            "-Dstream.tmpdir=" + System.getProperty("test.build.data", "/tmp"),
+            "-input", inputPath.makeQualified(inFs).toString(),
+            "-output", outputPath.makeQualified(outFs).toString(),
+            "-mapper", "pwd",
+            "-reducer", StreamJob.REDUCE_NONE
+          };
+        StreamJob streamJob = new StreamJob();
+        streamJob.setConf(myConf);
+
+        assertTrue("Job failed", ToolRunner.run(streamJob, args)==0);
+
+        // validate private cache files' permissions
+        checkPermissionsOnPrivateDistCache(localDirs,
+            taskControllerUser.getShortUserName(), taskTrackerSpecialGroup);
+        
+        // check the file is present even after the job is over.
+        // work directory symlink cleanup should not have removed the target 
+        // files.
+        checkPresenceOfPrivateDistCacheFiles(localDirs, 
+            taskControllerUser.getShortUserName(), new String[] {"test.sh"});
+
+        // validate private cache files' permissions
+        checkPermissionsOnPublicDistCache(FileSystem.getLocal(myConf),
+            localDirs, taskTrackerUser, taskTrackerPrimaryGroup);
+
+        checkPresenceOfPublicDistCacheFiles(localDirs, 
+            new String[] {"publicFile"});
+        assertOwnerShip(outputPath);
+        return null;
+      }
+    });
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar  4 04:01:09 2011
@@ -144,7 +144,7 @@ public class DefaultTaskController exten
   void enableTaskForCleanup(PathDeletionContext context)
          throws IOException {
     try {
-      FileUtil.chmod(context.fullPath, "ug+rwx", true);
+      FileUtil.chmod(context.fullPath, "u+rwx", true);
     } catch(InterruptedException e) {
       LOG.warn("Interrupted while setting permissions for " + context.fullPath +
           " for deletion.");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar  4 04:01:09 2011
@@ -373,16 +373,53 @@ public class TestTrackerDistributedCache
    */
   private void checkPublicFilePermissions(Path[] localCacheFiles)
       throws IOException {
+    checkPublicFilePermissions(fs, localCacheFiles);
+  }
+
+  /**
+   * Verify the permissions for a file localized as a public distributed
+   * cache file
+   * @param fs The Local FileSystem used to get the permissions
+   * @param localCacheFiles The list of files whose permissions should be 
+   * verified.
+   * @throws IOException
+   */
+  public static void checkPublicFilePermissions(FileSystem fs, 
+      Path[] localCacheFiles) throws IOException {
     // All the files should have read and executable permissions for others
     for (Path p : localCacheFiles) {
       FsPermission perm = fs.getFileStatus(p).getPermission();
-      assertTrue("cache file is not readable by others", perm.getOtherAction()
-          .implies(FsAction.READ));
-      assertTrue("cache file is not executable by others", perm
-          .getOtherAction().implies(FsAction.EXECUTE));
+      assertTrue("cache file is not readable / executable by owner: perm="
+          + perm.getUserAction(), perm.getUserAction()
+          .implies(FsAction.READ_EXECUTE));
+      assertTrue("cache file is not readable / executable by group: perm="
+          + perm.getGroupAction(), perm.getGroupAction()
+          .implies(FsAction.READ_EXECUTE));
+      assertTrue("cache file is not readable / executable by others: perm="
+          + perm.getOtherAction(), perm.getOtherAction()
+          .implies(FsAction.READ_EXECUTE));
     }
   }
-
+  
+  /**
+   * Verify the ownership for files localized as a public distributed cache
+   * file.
+   * @param fs The Local FileSystem used to get the ownership
+   * @param localCacheFiles THe list of files whose ownership should be
+   * verified
+   * @param owner The owner of the files
+   * @param group The group owner of the files.
+   * @throws IOException
+   */
+  public static void checkPublicFileOwnership(FileSystem fs,
+      Path[] localCacheFiles, String owner, String group)
+      throws IOException {
+    for (Path p: localCacheFiles) {
+      assertEquals(owner, fs.getFileStatus(p).getOwner());
+      assertEquals(group, fs.getFileStatus(p).getGroup());
+    }
+  }
+  
   protected String getJobOwnerName() throws IOException {
     return UserGroupInformation.getLoginUser().getUserName();
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Mar  4 04:01:09 2011
@@ -22,13 +22,17 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.List;
+import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -112,6 +116,11 @@ public class ClusterWithLinuxTaskControl
   protected UserGroupInformation taskControllerUser;
   
   protected static String taskTrackerSpecialGroup = null;
+  /**
+   * Primary group of the tasktracker - i.e. the user running the
+   * test.
+   */
+  protected static String taskTrackerPrimaryGroup = null;
   static {
     if (isTaskExecPathPassed()) {
       try {
@@ -123,6 +132,13 @@ public class ClusterWithLinuxTaskControl
         LOG.warn("Could not get group of the binary", e);
         fail("Could not get group of the binary");
       }
+      try {
+        taskTrackerPrimaryGroup = 
+          UserGroupInformation.getCurrentUser().getGroupNames()[0];
+      } catch (IOException ioe) {
+        LOG.warn("Could not get primary group of the current user", ioe);
+        fail("Could not get primary group of the current user");
+      }
     }
   }
 
@@ -300,4 +316,160 @@ public class ClusterWithLinuxTaskControl
           .equals(taskControllerUser.getGroupNames()[0]));
     }
   }
+  
+  /**
+   * Validates permissions of private distcache dir and its contents fully
+   */
+  public static void checkPermissionsOnPrivateDistCache(String[] localDirs,
+      String user, String groupOwner) throws IOException {
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPrivateDistributedCacheDir(user));
+      if (distCacheDir.exists()) {
+        checkPermissionsOnDir(distCacheDir, user, groupOwner, "dr-xrws---",
+            "-r-xrwx---");
+      }
+    }
+  }
+ 
+  /**
+   * Check that files expected to be localized in distributed cache for a user
+   * are present.
+   * @param localDirs List of mapred local directories.
+   * @param user User against which localization is happening
+   * @param expectedFileNames List of files expected to be localized
+   * @throws IOException
+   */
+  public static void checkPresenceOfPrivateDistCacheFiles(String[] localDirs,
+      String user, String[] expectedFileNames) throws IOException {
+    FileGatherer gatherer = new FileGatherer();
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPrivateDistributedCacheDir(user));
+      findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+    }
+    assertEquals("Files expected in private distributed cache were not found",
+        expectedFileNames.length, gatherer.getCount());
+  }
+
+  /**
+   * Validates permissions and ownership of public distcache dir and its 
+   * contents fully in all local dirs
+   */
+  public static void checkPermissionsOnPublicDistCache(FileSystem localFS,
+      String[] localDirs, String owner, String group) throws IOException {
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPublicDistributedCacheDir());
+
+      if (distCacheDir.exists()) {
+        checkPublicFilePermissions(localFS, distCacheDir, owner, group);
+      }
+    }
+  }
+
+  /**
+   * Checks that files expected to be localized in the public distributed
+   * cache are present
+   * @param localDirs List of mapred local directories
+   * @param expectedFileNames List of expected file names.
+   * @throws IOException
+   */
+  public static void checkPresenceOfPublicDistCacheFiles(String[] localDirs,
+      String[] expectedFileNames) throws IOException {
+    FileGatherer gatherer = new FileGatherer();
+    for (String localDir : localDirs) {
+      File distCacheDir = new File(localDir,
+          TaskTracker.getPublicDistributedCacheDir());
+      findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
+    }
+    assertEquals("Files expected in public distributed cache were not found",
+        expectedFileNames.length, gatherer.getCount());
+  }
+  
+  /**
+   * Validates permissions and ownership on the public distributed cache files
+   */
+  private static void checkPublicFilePermissions(FileSystem localFS, File dir,
+      String owner, String group)
+      throws IOException {
+    Path dirPath = new Path(dir.getAbsolutePath());
+    TestTrackerDistributedCacheManager.checkPublicFilePermissions(localFS, 
+        new Path[] {dirPath});
+    TestTrackerDistributedCacheManager.checkPublicFileOwnership(localFS,
+        new Path[] {dirPath}, owner, group);
+    if (dir.isDirectory()) {
+      File[] files = dir.listFiles();
+      for (File file : files) {
+        checkPublicFilePermissions(localFS, file, owner, group);
+      }
+    }
+  }
+
+  /**
+   * Validates permissions of given dir and its contents fully(i.e. recursively)
+   */
+  private static void checkPermissionsOnDir(File dir, String user,
+      String groupOwner, String expectedDirPermissions,
+      String expectedFilePermissions) throws IOException {
+    TestTaskTrackerLocalization.checkFilePermissions(dir.toString(),
+        expectedDirPermissions, user, groupOwner);
+    File[] files = dir.listFiles();
+    for (File file : files) {
+      if (file.isDirectory()) {
+        checkPermissionsOnDir(file, user, groupOwner, expectedDirPermissions,
+            expectedFilePermissions);
+      } else {
+        TestTaskTrackerLocalization.checkFilePermissions(file.toString(),
+            expectedFilePermissions, user, groupOwner);
+      }
+    }
+  }
+
+  // Check which files among those expected are present in the rootDir
+  // Add those present to the FileGatherer.
+  private static void findExpectedFiles(String[] expectedFileNames,
+      File rootDir, FileGatherer gatherer) {
+    
+    File[] files = rootDir.listFiles();
+    if (files == null) {
+      return;
+    }
+    for (File file : files) {
+      if (file.isDirectory()) {
+        findExpectedFiles(expectedFileNames, file, gatherer);
+      } else {
+        if (isFilePresent(expectedFileNames, file)) {
+          gatherer.addFileName(file.getName());
+        }
+      }
+    }
+    
+  }
+  
+  // Test if the passed file is present in the expected list of files.
+  private static boolean isFilePresent(String[] expectedFileNames, File file) {
+    boolean foundFileName = false;
+    for (String name : expectedFileNames) {
+      if (name.equals(file.getName())) {
+        foundFileName = true;
+        break;
+      }
+    }
+    return foundFileName;
+  }
+  
+  // Helper class to collect a list of file names across multiple
+  // method calls. Wrapper around a collection defined for clarity
+  private static class FileGatherer {
+    List<String> foundFileNames = new ArrayList<String>();
+    
+    void addFileName(String fileName) {
+      foundFileNames.add(fileName);
+    }
+    
+    int getCount() {
+      return foundFileNames.size();
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1077299&r1=1077298&r2=1077299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar  4 04:01:09 2011
@@ -264,6 +264,15 @@ public class MiniMRCluster {
     return (taskTrackerList.get(taskTracker)).getLocalDir();
   }
 
+  /**
+   * Get all the local directories for the Nth task tracker
+   * @param taskTracker the index of the task tracker to check
+   * @return array of local dirs
+   */
+  public String[] getTaskTrackerLocalDirs(int taskTracker) {
+    return (taskTrackerList.get(taskTracker)).getLocalDirs();
+  }
+
   public JobTrackerRunner getJobTrackerRunner() {
     return jobTracker;
   }

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java?rev=1077299&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java Fri Mar  4 04:01:09 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Verifies if TaskRunner.SetupWorkDir() is cleaning up files/dirs pointed
+ * to by symlinks under work dir.
+ */
+public class TestSetupWorkDir extends TestCase {
+
+  /**
+   * Creates 1 subdirectory and 1 file under dir2. Creates 1 subdir, 1 file,
+   * 1 symlink to a dir and a symlink to a file under dir1.
+   * Creates dir1/subDir, dir1/file, dir2/subDir, dir2/file,
+   * dir1/symlinkSubDir->dir2/subDir, dir1/symlinkFile->dir2/file.
+   */
+  static void createSubDirsAndSymLinks(JobConf jobConf, Path dir1, Path dir2)
+       throws IOException {
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    createSubDirAndFile(fs, dir1);
+    createSubDirAndFile(fs, dir2);
+    // now create symlinks under dir1 that point to file/dir under dir2
+    FileUtil.symLink(dir2+"/subDir", dir1+"/symlinkSubDir");
+    FileUtil.symLink(dir2+"/file", dir1+"/symlinkFile");
+  }
+
+  static void createSubDirAndFile(FileSystem fs, Path dir) throws IOException {
+    Path subDir = new Path(dir, "subDir");
+    fs.mkdirs(subDir);
+    Path p = new Path(dir, "file");
+    DataOutputStream out = fs.create(p);
+    out.writeBytes("dummy input");
+    out.close();    
+  }
+
+  void createEmptyDir(FileSystem fs, Path dir) throws IOException {
+    if (fs.exists(dir)) {
+      fs.delete(dir, true);
+    }
+    if (!fs.mkdirs(dir)) {
+      throw new IOException("Unable to create directory " + dir);
+    }
+  }
+
+  /**
+   * Validates if TaskRunner.setupWorkDir() is properly cleaning up the
+   * contents of workDir and creating tmp dir under it (even though workDir
+   * contains symlinks to files/directories).
+   */
+  public void testSetupWorkDir() throws IOException {
+    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
+                            "testSetupWorkDir");
+    Path myWorkDir = new Path(rootDir, "./work");
+    Path myTargetDir = new Path(rootDir, "./tmp");
+    JobConf jConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(jConf);
+    createEmptyDir(fs, myWorkDir);
+    createEmptyDir(fs, myTargetDir);
+
+    // create subDirs and symlinks under work dir
+    createSubDirsAndSymLinks(jConf, myWorkDir, myTargetDir);
+
+    assertTrue("Did not create symlinks/files/dirs properly. Check "
+        + myWorkDir + " and " + myTargetDir,
+        (fs.listStatus(myWorkDir).length == 4) &&
+        (fs.listStatus(myTargetDir).length == 2));
+
+    // let us disable creation of symlinks in setupWorkDir()
+    jConf.set("mapred.create.symlink", "no");
+
+    // Deletion of myWorkDir should not affect contents of myTargetDir.
+    // myTargetDir is like $user/jobcache/distcache
+    TaskRunner.setupWorkDir(jConf, new File(myWorkDir.toUri().getPath()));
+
+    // Contents of myWorkDir should be cleaned up and a tmp dir should be
+    // created under myWorkDir
+    assertTrue(myWorkDir + " is not cleaned up properly.",
+        fs.exists(myWorkDir) && (fs.listStatus(myWorkDir).length == 1));
+
+    // Make sure that the dir under myWorkDir is tmp
+    assertTrue(fs.listStatus(myWorkDir)[0].getPath().toUri().getPath()
+               .toString().equals(myWorkDir.toString() + "/tmp"));
+
+    // Make sure that myTargetDir is not changed/deleted
+    assertTrue("Dir " + myTargetDir + " seem to be modified.",
+        fs.exists(myTargetDir) && (fs.listStatus(myTargetDir).length == 2));
+
+    // cleanup
+    fs.delete(rootDir, true);
+  }
+}