You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2012/02/20 00:41:18 UTC

svn commit: r1291093 - in /hadoop/common/branches/branch-1: ./ src/mapred/ src/mapred/org/apache/hadoop/filecache/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/filecache/

Author: mattf
Date: Sun Feb 19 23:41:18 2012
New Revision: 1291093

URL: http://svn.apache.org/viewvc?rev=1291093&view=rev
Log:
MAPREDUCE-3824. Distributed caches are not removed properly. Contributed by Thomas Graves.

Modified:
    hadoop/common/branches/branch-1/   (props changed)
    hadoop/common/branches/branch-1/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-1/src/mapred/   (props changed)
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Propchange: hadoop/common/branches/branch-1/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Feb 19 23:41:18 2012
@@ -3,6 +3,6 @@
 /hadoop/common/branches/branch-0.20-security-203:1096071,1097011,1097249,1097269,1097281,1097966,1098816,1098819,1098823,1098827,1098832,1098839,1098854,1098863,1099088,1099191,1099324,1099330,1099333,1102071,1128115
 /hadoop/common/branches/branch-0.20-security-204:1128390,1147228,1148069,1149316,1154413
 /hadoop/common/branches/branch-0.20-security-205:1174370,1174917,1176042,1176248,1176638,1176645,1202378
-/hadoop/common/branches/branch-1.0:1214410
+/hadoop/common/branches/branch-1.0:1214410,1291091
 /hadoop/core/branches/branch-0.19:713112
 /hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755960,755986,755998,756352,757448,757624,757849,758156,758180,759398,759932,760502,760783,761046,761482,761632,762216,762879,763107,763502,764967,765016,765809,765951,771607,771661,772844,772876,772884,772920,773889,776638,778962,778966,779893,781720,784661,785046,785569

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1291093&r1=1291092&r2=1291093&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sun Feb 19 23:41:18 2012
@@ -200,6 +200,8 @@ Release 1.0.1 - 2012.02.19
 
     HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)
 
+    MAPREDUCE-3824. Distributed caches are not removed properly. (Thomas Graves
+    via mattf)
 
 Release 1.0.0 - 2011.12.15
 

Propchange: hadoop/common/branches/branch-1/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Feb 19 23:41:18 2012
@@ -3,7 +3,7 @@
 /hadoop/common/branches/branch-0.20-security-203/CHANGES.txt:1096071,1097011,1097249,1097269,1097281,1097966,1098816,1098819,1098823,1098827,1098832,1098839,1098854,1098863,1099088,1099191,1099324,1099330,1099333,1102071,1128115
 /hadoop/common/branches/branch-0.20-security-204/CHANGES.txt:1128390,1147228,1148069,1149316,1152887,1154413,1159730,1161741
 /hadoop/common/branches/branch-0.20-security-205/CHANGES.txt:1170696,1171234,1171294,1174368,1174370,1174917,1176042,1176248,1176638,1176645,1200050,1200078,1200199,1202378,1203419,1205530
-/hadoop/common/branches/branch-1.0/CHANGES.txt:1208934,1209370,1211431,1211738,1214655,1214658,1214661,1214675,1238997,1242072,1243204,1243785,1244142,1291078
+/hadoop/common/branches/branch-1.0/CHANGES.txt:1208934,1209370,1211431,1211738,1214655,1214658,1214661,1214675,1238997,1242072,1243204,1243785,1244142,1291078,1291091
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
 /hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755986,755998,756352,757448,757624,757849,758156,758180,759398,759932,760502,760783,761046,761482,761632,762216,762879,763107,763502,764967,765016,765809,765951,771607,772844,772876,772884,772920,773889,776638,778962,778966,779893,781720,784661,785046,785569

Propchange: hadoop/common/branches/branch-1/src/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Feb 19 23:41:18 2012
@@ -3,7 +3,7 @@
 /hadoop/common/branches/branch-0.20-security-203/src/mapred:1096071,1097011,1097249,1097269,1097281,1097966,1098816,1098819,1098823,1098827,1098832,1098839,1098854,1098863,1099088,1099191,1099324,1099330,1099333,1128115
 /hadoop/common/branches/branch-0.20-security-204/src/mapred:1128390
 /hadoop/common/branches/branch-0.20-security-205/src/mapred:1174370,1176042,1176248,1176638,1176645,1202378
-/hadoop/common/branches/branch-1.0/src/mapred:1214410
+/hadoop/common/branches/branch-1.0/src/mapred:1214410,1291091
 /hadoop/core/branches/branch-0.19/src/mapred:713112
 /hadoop/core/trunk/src/mapred:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755960,755986,755998,756352,757448,757624,757849,758156,758180,759398,759932,760502,760783,761046,761482,761632,762216,762879,763107,763502,764967,765016,765809,765951,771607,771661,772844,772876,772884,772920,773889,776638,778962,778966,779893,781720,784661,785046,785569
 /hadoop/mapreduce/trunk/src/java:808650

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1291093&r1=1291092&r2=1291093&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java Sun Feb 19 23:41:18 2012
@@ -259,10 +259,10 @@ public class TaskDistributedCacheManager
   public void setSizes(long[] sizes) throws IOException {
     int i = 0;
     for (CacheFile c: cacheFiles) {
-      if (!c.isPublic && c.type == CacheFile.FileType.ARCHIVE && 
-    	  c.status != null) {
-        distributedCacheManager.setSize(c.status, sizes[i++]);
+      if (!c.isPublic && c.status != null) {
+        distributedCacheManager.setSize(c.status, sizes[i]);
       }
+      i++;
     }
   }
 

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1291093&r1=1291092&r2=1291093&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Sun Feb 19 23:41:18 2012
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -557,7 +558,7 @@ public class TrackerDistributedCacheMana
     //
     // This field should be accessed under global cachedArchives lock.
     //
-    private int refcount;    // number of instances using this cache
+    private AtomicInteger refcount;    // number of instances using this cache
 
     //
     // The following two fields should be accessed under
@@ -588,7 +589,7 @@ public class TrackerDistributedCacheMana
                        String uniqueString, String user, String key) {
       super();
       this.localizedLoadPath = localLoadPath;
-      this.refcount = 0;
+      this.refcount = new AtomicInteger();
       this.localizedBaseDir = baseDir;
       this.size = 0;
       this.subDir = subDir;
@@ -598,14 +599,16 @@ public class TrackerDistributedCacheMana
     }
     
     public synchronized void incRefCount() {
-      refcount += 1;
+      refcount.incrementAndGet() ;
+      LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
     }
 
     public void decRefCount() {
       synchronized (cachedArchives) {
         synchronized (this) {
-          refcount -= 1;
-          if(refcount <= 0) {
+          refcount.decrementAndGet() ;
+          LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
+          if(refcount.get() <= 0) {
             String key = this.key;
             cachedArchives.remove(key);
             cachedArchives.put(key, this);
@@ -615,11 +618,12 @@ public class TrackerDistributedCacheMana
     }
 
     public int getRefCount() {
-      return refcount;
+      return refcount.get();
     }
 
     public synchronized boolean isUsed() {
-      return refcount > 0;
+      LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
+      return refcount.get() > 0;
     }
 
     Path getBaseDir(){
@@ -652,7 +656,8 @@ public class TrackerDistributedCacheMana
         try {
           localFs.delete(f.getValue().localizedLoadPath, true);
         } catch (IOException ie) {
-          LOG.debug("Error cleaning up cache", ie);
+          LOG.debug("Error cleaning up cache (" + 
+              f.getValue().localizedLoadPath + ")", ie);
         }
       }
       cachedArchives.clear();
@@ -668,6 +673,10 @@ public class TrackerDistributedCacheMana
     return result;
   }
 
+  /**
+   * Set the sizes for any archives, files, or directories in the private
+   * distributed cache.
+   */
   public void setArchiveSizes(JobID jobId, long[] sizes) throws IOException {
     TaskDistributedCacheManager mgr = jobArchives.get(jobId);
     if (mgr != null) {
@@ -989,8 +998,13 @@ public class TrackerDistributedCacheMana
       HashMap<Path, CacheDir> toBeCleanedBaseDir = 
         new HashMap<Path, CacheDir>();
       synchronized (properties) {
+        LOG.debug("checkAndCleanup: Allowed Cache Size test");
         for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
           CacheDir baseDirCounts = baseDir.getValue();
+          LOG.debug(baseDir.getKey() + ": allowedCacheSize=" + allowedCacheSize +
+              ",baseDirCounts.size=" + baseDirCounts.size +
+              ",allowedCacheSubdirs=" + allowedCacheSubdirs + 
+              ",baseDirCounts.subdirs=" + baseDirCounts.subdirs);
           if (allowedCacheSize < baseDirCounts.size ||
               allowedCacheSubdirs < baseDirCounts.subdirs) {
             CacheDir tcc = new CacheDir();
@@ -1002,6 +1016,7 @@ public class TrackerDistributedCacheMana
       }
       // try deleting cache Status with refcount of zero
       synchronized (cachedArchives) {
+        LOG.debug("checkAndCleanup: Global Cache Size Check");
         for(
             Iterator<Map.Entry<String, CacheStatus>> it 
             = cachedArchives.entrySet().iterator();
@@ -1010,11 +1025,16 @@ public class TrackerDistributedCacheMana
           String cacheId = entry.getKey();
           CacheStatus cacheStatus = cachedArchives.get(cacheId);
           CacheDir leftToClean = toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
+
           if (leftToClean != null && (leftToClean.size > 0 || leftToClean.subdirs > 0)) {
             synchronized (cacheStatus) {
               // if reference count is zero mark the cache for deletion
-              if (!cacheStatus.isUsed()) {
-                leftToClean.size -= cacheStatus.size;
+              boolean isUsed = cacheStatus.isUsed();
+              long cacheSize = cacheStatus.size; 
+              LOG.debug(cacheStatus.getLocalizedUniqueDir() + ": isUsed=" + isUsed + 
+                  " size=" + cacheSize + " leftToClean.size=" + leftToClean.size);
+              if (!isUsed) {
+                leftToClean.size -= cacheSize;
                 leftToClean.subdirs--;
                 // delete this cache entry from the global list 
                 // and mark the localized file for deletion

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java?rev=1291093&r1=1291092&r2=1291093&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java Sun Feb 19 23:41:18 2012
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -339,21 +340,25 @@ public class JobLocalizer {
    * @return the size of the archive objects
    */
   public static long[] downloadPrivateCache(Configuration conf) throws IOException {
-    downloadPrivateCacheObjects(conf,
+    long[] fileSizes = downloadPrivateCacheObjects(conf,
                                 DistributedCache.getCacheFiles(conf),
                                 DistributedCache.getLocalCacheFiles(conf),
                                 DistributedCache.getFileTimestamps(conf),
                                 TrackerDistributedCacheManager.
                                   getFileVisibilities(conf),
                                 false);
-    return 
-      downloadPrivateCacheObjects(conf,
+
+    long[] archiveSizes = downloadPrivateCacheObjects(conf,
                                   DistributedCache.getCacheArchives(conf),
                                   DistributedCache.getLocalCacheArchives(conf),
                                   DistributedCache.getArchiveTimestamps(conf),
                                   TrackerDistributedCacheManager.
                                     getArchiveVisibilities(conf),
                                   true);
+
+    // The order here matters - it has to match order of cache files
+    // in TaskDistributedCacheManager.
+    return ArrayUtils.addAll(fileSizes, archiveSizes);
   }
 
   public void localizeJobFiles(JobID jobid, JobConf jConf,

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1291093&r1=1291092&r2=1291093&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Sun Feb 19 23:41:18 2012
@@ -173,7 +173,7 @@ public interface TaskUmbilicalProtocol e
 
   /**
    * The job initializer needs to report the sizes of the archive
-   * objects in the private distributed cache.
+   * objects and directories in the private distributed cache.
    * @param jobId the job to update
    * @param sizes the array of sizes that were computed
    * @throws IOException

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1291093&r1=1291092&r2=1291093&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Sun Feb 19 23:41:18 2012
@@ -77,6 +77,10 @@ public class TestTrackerDistributedCache
   protected Path firstCacheFilePublic;
   protected Path secondCacheFile;
   protected Path secondCacheFilePublic;
+  protected Path firstCacheDirPublic;
+  protected Path firstCacheDirPrivate;
+  protected Path firstCacheFileInDirPublic;
+  protected Path firstCacheFileInDirPrivate;
   private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
@@ -136,6 +140,15 @@ public class TestTrackerDistributedCache
     createPublicTempFile(secondCacheFilePublic);
     createPrivateTempFile(firstCacheFile);
     createPrivateTempFile(secondCacheFile);
+
+    firstCacheDirPublic = new Path(TEST_ROOT_DIR, "firstcachedirPublic");
+    firstCacheDirPrivate = new Path(TEST_ROOT_DIR, "firstcachedirPrivate");
+    firstCacheFileInDirPublic = new Path(firstCacheDirPublic, "firstcacheFileinDirPublic.txt");
+    firstCacheFileInDirPrivate = new Path(firstCacheDirPrivate, "firstcacheFileinDirPrivate.txt");
+    createPublicTempDir(firstCacheDirPublic);
+    createPrivateTempDir(firstCacheDirPrivate);
+    createPublicTempFile(firstCacheFileInDirPublic);
+    createPrivateTempFile(firstCacheFileInDirPrivate);
   }
   
   protected void refreshConf(Configuration conf) throws IOException {
@@ -263,41 +276,79 @@ public class TestTrackerDistributedCache
     TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
 
     // Task localizing for first job
+    JobID jobId = new JobID("jt", 1);
     TaskDistributedCacheManager handle = manager
-        .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+        .newTaskDistributedCacheManager(jobId, conf1);
     handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), 
         TaskTracker.getPrivateDistributedCacheDir(userName));
-    JobLocalizer.downloadPrivateCache(conf1);
+    long[] sizes = JobLocalizer.downloadPrivateCache(conf1);
+    if (sizes != null) {
+      manager.setArchiveSizes(jobId, sizes);
+    }
+    handle.release();
+    for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
+      assertEquals(0, manager.getReferenceCount(c.getStatus()));
+      long filesize = FileUtil.getDU(new File(c.getStatus().localizedLoadPath.getParent().toString()));
+      assertTrue("filesize is not greater than 0", filesize > 0);
+      assertEquals(filesize, c.getStatus().size);
+    }
+
+    // Test specifying directories to go into distributed cache and make
+    // their sizes are calculated properly.
+    Job job2 = new Job(conf);
+    Configuration conf2 = job2.getConfiguration();
+    conf1.set("user.name", userName);
+    DistributedCache.addCacheFile(firstCacheDirPublic.toUri(), conf2);
+    DistributedCache.addCacheFile(firstCacheDirPrivate.toUri(), conf2);
+
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+
+    // Task localizing for second job
+    JobID job2Id = new JobID("jt", 2);
+    handle = manager.newTaskDistributedCacheManager(job2Id, conf2);
+    handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    long[] sizes2 = JobLocalizer.downloadPrivateCache(conf2);
+    for (int j=0; j > sizes2.length; j++) {
+      LOG.info("size is: " + sizes2[j]);
+    }
+    if (sizes2 != null) {
+      manager.setArchiveSizes(job2Id, sizes2);
+    }
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       assertEquals(0, manager.getReferenceCount(c.getStatus()));
+      long filesize = FileUtil.getDU(new File(c.getStatus().localizedLoadPath.getParent().toString()));
+      assertTrue("filesize is not greater than 0", filesize > 0);
+      assertEquals(filesize, c.getStatus().size);
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
     createPrivateTempFile(thirdCacheFile);
     
     // Configures another job with three regular files.
-    Job job2 = new Job(conf);
-    Configuration conf2 = job2.getConfiguration();
-    conf2.set("user.name", userName);
+    Job job3 = new Job(conf);
+    Configuration conf3 = job3.getConfiguration();
+    conf3.set("user.name", userName);
     // add a file that would get failed to localize
-    DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2);
+    DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf3);
     // add a file that is already localized by different job
-    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
+    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf3);
     // add a file that is never localized
-    DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
+    DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf3);
     
-    TrackerDistributedCacheManager.determineTimestamps(conf2);
-    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+    TrackerDistributedCacheManager.determineTimestamps(conf3);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf3);
 
-    // Task localizing for second job
+    // Task localizing for third job
     // localization for the "firstCacheFile" will fail.
-    handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2);
+    handle = manager.newTaskDistributedCacheManager(new JobID("jt", 3), conf3);
     Throwable th = null;
     try {
-      handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
+      handle.setupCache(conf3, TaskTracker.getPublicDistributedCacheDir(),
           TaskTracker.getPrivateDistributedCacheDir(userName));
-      JobLocalizer.downloadPrivateCache(conf2);
+      JobLocalizer.downloadPrivateCache(conf3);
     } catch (IOException e) {
       th = e;
       LOG.info("Exception during setup", e);
@@ -949,6 +1000,13 @@ public class TestTrackerDistributedCache
     createTempFile(p, TEST_FILE_SIZE);
   }
   
+  static void createTempDir(Path p) throws IOException {
+    File dir = new File(p.toString());
+    dir.mkdirs();
+    FileSystem.LOG.info("created temp directory: " + p);
+
+  }
+
   static void createTempFile(Path p, int size) throws IOException {
     File f = new File(p.toString());
     FileOutputStream os = new FileOutputStream(f);
@@ -971,12 +1029,30 @@ public class TestTrackerDistributedCache
     FileUtil.chmod(p.toString(), "0770",true);
   }
 
+  static void createPublicTempDir(Path p)
+  throws IOException, InterruptedException {
+    createTempDir(p);
+    FileUtil.chmod(p.toString(), "0777",true);
+  }
+
+  static void createPrivateTempDir(Path p)
+  throws IOException, InterruptedException {
+    createTempDir(p);
+    FileUtil.chmod(p.toString(), "0770",true);
+  }
+
   @Override
   protected void tearDown() throws IOException {
     new File(firstCacheFile.toString()).delete();
     new File(secondCacheFile.toString()).delete();
     new File(firstCacheFilePublic.toString()).delete();
     new File(secondCacheFilePublic.toString()).delete();
+
+    new File(firstCacheFileInDirPublic.toString()).delete();
+    new File(firstCacheFileInDirPrivate.toString()).delete();
+    new File(firstCacheDirPrivate.toString()).delete();
+    new File(firstCacheDirPublic.toString()).delete();
+
     FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
   }