You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:21:35 UTC

svn commit: r1077504 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Author: omalley
Date: Fri Mar  4 04:21:35 2011
New Revision: 1077504

URL: http://svn.apache.org/viewvc?rev=1077504&view=rev
Log:
commit 32c081c757d8483ee4915e4da2e66abf51b1f038
Author: Arun C Murthy <ac...@apache.org>
Date:   Tue Jun 15 11:25:26 2010 -0700

    MAPREDUCE-1538. Add a limit on the number of artifacts in the DistributedCache to ensure we cleanup aggressively. Contributed by Dick King.
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1538. Add a limit on the number of artifacts in the
    +    DistributedCache to ensure we cleanup aggressively. (Dick King via
    +    acmurthy)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077504&r1=1077503&r2=1077504&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar  4 04:21:35 2011
@@ -61,10 +61,19 @@ public class TrackerDistributedCacheMana
   private TreeMap<String, CacheStatus> cachedArchives = 
     new TreeMap<String, CacheStatus>();
 
-  private TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+  // For holding the properties of each cache directory
+  static class CacheDir {
+    long size;
+    long subdirs;
+  }
+  private TreeMap<Path, CacheDir> baseDirProperties =
+    new TreeMap<Path, CacheDir>();
 
   // default total cache size (10GB)
   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
+  private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
+  private long allowedCacheSize;
+  private long allowedCacheSubdirs;
 
   private static final Log LOG =
     LogFactory.getLog(TrackerDistributedCacheManager.class);
@@ -85,6 +94,14 @@ public class TrackerDistributedCacheMana
     this.trackerConf = conf;
     this.lDirAllocator = new LocalDirAllocator("mapred.local.dir");
     this.taskController = taskController;
+
+    // setting the cache size to a default of 10GB
+    this.allowedCacheSize = conf.getLong
+      ("local.cache.size", DEFAULT_CACHE_SIZE);
+    // setting the cache number of subdirectories limit to a default of 10000
+    this.allowedCacheSubdirs = conf.getLong
+      ("mapreduce.tasktracker.local.cache.numberdirectories",
+       DEFAULT_CACHE_SUBDIR_LIMIT);
   }
 
   /**
@@ -161,20 +178,21 @@ public class TrackerDistributedCacheMana
 
       // try deleting stuff if you can
       long size = 0;
+      long numberSubdirs = 0;
       synchronized (lcacheStatus) {
-        synchronized (baseDirSize) {
-          Long get = baseDirSize.get(lcacheStatus.getBaseDir());
-          if (get != null) {
-            size = get.longValue();
+        synchronized (baseDirProperties) {
+          CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
+          if (cacheDir != null) {
+            size = cacheDir.size;
+            numberSubdirs = cacheDir.subdirs;
           } else {
-            LOG.warn("Cannot find size of baseDir: "
-                + lcacheStatus.getBaseDir());
+            LOG.warn("Cannot find size and number of subdirectories of" +
+                     " baseDir: " + lcacheStatus.getBaseDir());
           }
         }
       }
-      // setting the cache size to a default of 10GB
-      long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
-      if (allowedSize < size) {
+
+      if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
         // try some cache deletions
         deleteCache(conf);
       }
@@ -254,18 +272,11 @@ public class TrackerDistributedCacheMana
     for (CacheStatus lcacheStatus : deleteSet) {
       synchronized (lcacheStatus) {
         FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true);
+
+        // Update the maps baseDirSize and baseDirNumberSubDir
         LOG.info("Deleted path " + lcacheStatus.localizedLoadPath);
-        // decrement the size of the cache from baseDirSize
-        synchronized (baseDirSize) {
-          Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
-          if ( dirSize != null ) {
-            dirSize -= lcacheStatus.size;
-            baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
-          } else {
-            LOG.warn("Cannot find record of the baseDir: " + 
-                     lcacheStatus.localizedBaseDir + " during delete!");
-          }
-        }
+
+        deleteCacheInfoUpdate(lcacheStatus);
       }
     }
   }
@@ -425,15 +436,10 @@ public class TrackerDistributedCacheMana
     long cacheSize =
       FileUtil.getDU(new File(parchive.getParent().toString()));
     cacheStatus.size = cacheSize;
-    synchronized (baseDirSize) {
-      Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
-      if( dirSize == null ) {
-        dirSize = Long.valueOf(cacheSize);
-      } else {
-        dirSize += cacheSize;
-      }
-      baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
-    }
+    
+    // Increase the size and sub directory count of the cache
+    // from baseDirSize and baseDirNumberSubDir.
+    addCacheInfoUpdate(cacheStatus);
 
     // set proper permissions for the localized directory
     setPermissions(conf, cacheStatus, isPublic);
@@ -845,4 +851,50 @@ public class TrackerDistributedCacheMana
 
     return path;
   }
+  
+  /**
+   * Decrement the size and sub directory count of the cache from baseDirSize
+   * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
+   * @param cacheStatus cache status of the cache is deleted
+   */
+  private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
+    if (!cacheStatus.inited) {
+      // if it is not created yet, do nothing.
+      return;
+    }
+    // decrement the size of the cache from baseDirSize
+    synchronized (baseDirProperties) {
+      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+      if (cacheDir != null) {
+        cacheDir.size -= cacheStatus.size;
+        cacheDir.subdirs--;
+      } else {
+        LOG.warn("Cannot find size and number of subdirectories of" +
+                 " baseDir: " + cacheStatus.getBaseDir());
+      }
+    }
+  }
+  
+  /**
+   * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
+   * Increase the size and sub directory count of the cache from baseDirSize
+   * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
+   * @param cacheStatus cache status of the cache is added
+   */
+  private void addCacheInfoUpdate(CacheStatus cacheStatus) {
+    long cacheSize = cacheStatus.size;
+    // decrement the size of the cache from baseDirSize
+    synchronized (baseDirProperties) {
+      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+      if (cacheDir != null) {
+        cacheDir.size += cacheSize;
+        cacheDir.subdirs++;
+      } else {
+        cacheDir = new CacheDir();
+        cacheDir.size = cacheSize;
+        cacheDir.subdirs = 1;
+        baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077504&r1=1077503&r2=1077504&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar  4 04:21:35 2011
@@ -65,6 +65,7 @@ public class TestTrackerDistributedCache
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
   private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
+  private static final int LOCAL_CACHE_SUBDIR = 2;
   protected Configuration conf;
   protected Path firstCacheFile;
   protected Path secondCacheFile;
@@ -479,6 +480,8 @@ public class TestTrackerDistributedCache
     Configuration conf2 = new Configuration(conf);
     conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories",
+                   LOCAL_CACHE_SUBDIR);
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf2, taskController);
@@ -487,6 +490,7 @@ public class TestTrackerDistributedCache
     String userName = getJobOwnerName();
     conf2.set("user.name", userName);
 
+    // We first test the size limit
     Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
@@ -503,6 +507,33 @@ public class TestTrackerDistributedCache
     assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
         localfs.exists(localCache));
+    
+    // Now we test the number of sub directories limit
+    // Create the temporary cache files to be used in the tests.
+    Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+    Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+    // Adding two more small files, so it triggers the number of sub directory
+    // limit but does not trigger the file size limit.
+    createTempFile(thirdCacheFile, 1);
+    createTempFile(fourthCacheFile, 1);
+    Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(thirdCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false, false);
+    // Release the third cache so that it can be deleted while sweeping
+    manager.releaseCache(thirdCacheFile.toUri(), conf2, now);
+    // Getting the fourth cache will make the number of sub directories becomes
+    // 3 which is greater than 2. So the released cache will be deleted.
+    manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(fourthCacheFile), false, 
+        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+    assertFalse("DistributedCache failed deleting old" + 
+        " cache when the cache exceeds the number of sub directories limit.",
+        localfs.exists(thirdLocalCache));
+    // Clean up the files created in this test
+    new File(thirdCacheFile.toString()).delete();
+    new File(fourthCacheFile.toString()).delete();
   }
   
   public void testFileSystemOtherThanDefault() throws Exception {
@@ -526,13 +557,17 @@ public class TestTrackerDistributedCache
   }
 
   static void createTempFile(Path p) throws IOException {
+    createTempFile(p, TEST_FILE_SIZE);
+  }
+  
+  static void createTempFile(Path p, int size) throws IOException {
     File f = new File(p.toString());
     FileOutputStream os = new FileOutputStream(f);
-    byte[] toWrite = new byte[TEST_FILE_SIZE];
+    byte[] toWrite = new byte[size];
     new Random().nextBytes(toWrite);
     os.write(toWrite);
     os.close();
-    FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
+    FileSystem.LOG.info("created: " + p + ", size=" + size);
   }
   
   static void createPublicTempFile(Path p)