You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2011/08/01 23:41:22 UTC

svn commit: r1152941 - in /hadoop/common/branches/branch-0.20-security: CHANGES.txt src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Author: mahadev
Date: Mon Aug  1 21:41:20 2011
New Revision: 1152941

URL: http://svn.apache.org/viewvc?rev=1152941&view=rev
Log:
MAPREDUCE-2494. Make the distributed cache delete entires using LRU priority. (Robert Joseph Evans via mahadev)

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1152941&r1=1152940&r2=1152941&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Mon Aug  1 21:41:20 2011
@@ -35,6 +35,9 @@ Release 0.20.205.0 - unreleased
     HADOOP-7314. Add support for throwing UnknownHostException when a host
     doesn't resolve. Needed for MAPREDUCE-2489. (Jeffrey Naisbitt via mattf)
 
+    MAPREDUCE-2494. Make the distributed cache delete entires using LRU priority
+    (Robert Joseph Evans via mahadev)
+
 Release 0.20.204.0 - unreleased
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1152941&r1=1152940&r2=1152941&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Mon Aug  1 21:41:20 2011
@@ -28,6 +28,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
@@ -67,8 +68,8 @@ import org.apache.hadoop.mapreduce.secur
  */
 public class TrackerDistributedCacheManager {
   // cacheID to cacheStatus mapping
-  private TreeMap<String, CacheStatus> cachedArchives = 
-    new TreeMap<String, CacheStatus>();
+  private LinkedHashMap<String, CacheStatus> cachedArchives = 
+    new LinkedHashMap<String, CacheStatus>();
   private Map<JobID, TaskDistributedCacheManager> jobArchives =
     Collections.synchronizedMap(
         new HashMap<JobID, TaskDistributedCacheManager>());
@@ -79,8 +80,11 @@ public class TrackerDistributedCacheMana
   // default total cache size (10GB)
   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
   private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
+  private static final float DEFAULT_CACHE_KEEP_AROUND_PCT = 0.95f;
   private long allowedCacheSize;
   private long allowedCacheSubdirs;
+  private long allowedCacheSizeCleanupGoal;
+  private long allowedCacheSubdirsCleanupGoal;
 
   private static final Log LOG =
     LogFactory.getLog(TrackerDistributedCacheManager.class);
@@ -110,6 +114,13 @@ public class TrackerDistributedCacheMana
     this.allowedCacheSubdirs = conf.getLong
       ("mapreduce.tasktracker.local.cache.numberdirectories",
        DEFAULT_CACHE_SUBDIR_LIMIT);
+    double cleanupPct = conf.getFloat("mapreduce.tasktracker.cache.local.keep.pct",
+        DEFAULT_CACHE_KEEP_AROUND_PCT);
+    this.allowedCacheSizeCleanupGoal = 
+      (long)(this.allowedCacheSize * cleanupPct);
+    this.allowedCacheSubdirsCleanupGoal = 
+      (long)(this.allowedCacheSubdirs * cleanupPct);
+
     this.taskController = controller;
     this.cleanupThread = new CleanupThread(conf);
   }
@@ -162,15 +173,13 @@ public class TrackerDistributedCacheMana
         lcacheStatus = 
           new CacheStatus(new Path(localPath.toString().replace(cachePath, "")), 
                           localPath, new Path(subDir), uniqueString, 
-                          isPublic ? null : user);
+                          isPublic ? null : user, key);
         cachedArchives.put(key, lcacheStatus);
       }
 
       //mark the cache for use.
       file.setStatus(lcacheStatus);
-      synchronized (lcacheStatus) {
-        lcacheStatus.refcount++;
-      }
+      lcacheStatus.incRefCount();
     }
     
     try {
@@ -203,11 +212,8 @@ public class TrackerDistributedCacheMana
         }
       }
     } catch (IOException ie) {
-      synchronized (lcacheStatus) {
-        // release this cache
-        lcacheStatus.refcount -= 1;
-        throw ie;
-      }
+      lcacheStatus.decRefCount();
+      throw ie;
     }
     return localizedPath;
   }
@@ -223,9 +229,7 @@ public class TrackerDistributedCacheMana
    * @throws IOException
    */
   void releaseCache(CacheStatus status) throws IOException {
-	synchronized (status) {
-      status.refcount--;
-    }
+    status.decRefCount();
   }
 
   void setSize(CacheStatus status, long size) throws IOException {
@@ -241,9 +245,7 @@ public class TrackerDistributedCacheMana
    * This method is called from unit tests. 
    */
   int getReferenceCount(CacheStatus status) throws IOException {
-	synchronized (status) {
-	  return status.refcount;
-	}
+    return status.getRefCount();
   }
 
   /**
@@ -540,11 +542,11 @@ public class TrackerDistributedCacheMana
     }
   }
 
-  static class CacheStatus {
+  class CacheStatus {
     //
     // This field should be accessed under global cachedArchives lock.
     //
-    int refcount;    // number of instances using this cache
+    private int refcount;    // number of instances using this cache
 
     //
     // The following two fields should be accessed under
@@ -568,9 +570,11 @@ public class TrackerDistributedCacheMana
     Path localizedBaseDir;
     // The user that owns the cache entry or null if it is public
     final String user;
+    //The key of this in the cachedArchives.
+    private final String key;
 
     public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
-                       String uniqueString, String user) {
+                       String uniqueString, String user, String key) {
       super();
       this.localizedLoadPath = localLoadPath;
       this.refcount = 0;
@@ -579,8 +583,34 @@ public class TrackerDistributedCacheMana
       this.subDir = subDir;
       this.uniqueString = uniqueString;
       this.user = user;
+      this.key = key;
     }
     
+    public synchronized void incRefCount() {
+      refcount += 1;
+    }
+
+    public void decRefCount() {
+      synchronized (cachedArchives) {
+        synchronized (this) {
+          refcount -= 1;
+          if(refcount <= 0) {
+            String key = this.key;
+            cachedArchives.remove(key);
+            cachedArchives.put(key, this);
+          }
+        }
+      }
+    }
+
+    public int getRefCount() {
+      return refcount;
+    }
+
+    public synchronized boolean isUsed() {
+      return refcount > 0;
+    }
+
     Path getBaseDir(){
       return this.localizedBaseDir;
     }
@@ -917,49 +947,52 @@ public class TrackerDistributedCacheMana
     }
   }
 
+  // For holding the properties of each cache directory
+  private static class CacheDir {
+    long size;
+    long subdirs;
+  }
+  
   /**
    * This class holds properties of each base directories and is responsible
    * for clean up unused cache files in base directories.
    */
   protected class BaseDirManager {
-
-    // For holding the properties of each cache directory
-    private class CacheDir {
-      long size;
-      long subdirs;
-    }
-
-    private TreeMap<Path, BaseDirManager.CacheDir> properties =
-    new TreeMap<Path, BaseDirManager.CacheDir>();
-
-    private long getDirSize(Path p) {
-      return properties.get(p).size;
-    }
-    private long getDirSubdirs(Path p) {
-      return properties.get(p).subdirs;
-    }
+    private TreeMap<Path, CacheDir> properties =
+    new TreeMap<Path, CacheDir>();
     
     void checkAndCleanup() throws IOException {
       Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
-      Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+      HashMap<Path, CacheDir> toBeCleanedBaseDir = 
+        new HashMap<Path, CacheDir>();
       synchronized (properties) {
-        for (Path baseDir : properties.keySet()) {
-          if (allowedCacheSize < getDirSize(baseDir) ||
-              allowedCacheSubdirs < getDirSubdirs(baseDir)) {
-            toBeCleanedBaseDir.add(baseDir);
+        for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
+          CacheDir baseDirCounts = baseDir.getValue();
+          if (allowedCacheSize < baseDirCounts.size ||
+              allowedCacheSubdirs < baseDirCounts.subdirs) {
+            CacheDir tcc = new CacheDir();
+            tcc.size = baseDirCounts.size - allowedCacheSizeCleanupGoal;
+            tcc.subdirs = baseDirCounts.subdirs - allowedCacheSubdirsCleanupGoal;
+            toBeCleanedBaseDir.put(baseDir.getKey(), tcc);
           }
         }
       }
       // try deleting cache Status with refcount of zero
       synchronized (cachedArchives) {
-        for (Iterator<String> it = cachedArchives.keySet().iterator(); 
-            it.hasNext();) {
-          String cacheId = it.next();
+        for(
+            Iterator<Map.Entry<String, CacheStatus>> it 
+            = cachedArchives.entrySet().iterator();
+            it.hasNext(); ) {
+          Map.Entry<String, CacheStatus> entry = it.next();
+          String cacheId = entry.getKey();
           CacheStatus cacheStatus = cachedArchives.get(cacheId);
-          if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+          CacheDir leftToClean = toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
+          if (leftToClean != null && (leftToClean.size > 0 || leftToClean.subdirs > 0)) {
             synchronized (cacheStatus) {
               // if reference count is zero mark the cache for deletion
-              if (cacheStatus.refcount == 0) {
+              if (!cacheStatus.isUsed()) {
+                leftToClean.size -= cacheStatus.size;
+                leftToClean.subdirs--;
                 // delete this cache entry from the global list 
                 // and mark the localized file for deletion
                 toBeDeletedCache.add(cacheStatus);
@@ -1007,7 +1040,7 @@ public class TrackerDistributedCacheMana
     }
     // decrement the size of the cache from baseDirSize
     synchronized (baseDirManager.properties) {
-      BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
+      CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
       if (cacheDir != null) {
         cacheDir.size -= cacheStatus.size;
         cacheDir.subdirs--;
@@ -1028,12 +1061,12 @@ public class TrackerDistributedCacheMana
     long cacheSize = cacheStatus.size;
     // decrement the size of the cache from baseDirSize
     synchronized (baseDirManager.properties) {
-      BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
+      CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
       if (cacheDir != null) {
         cacheDir.size += cacheSize;
         cacheDir.subdirs++;
       } else {
-        cacheDir = new BaseDirManager.CacheDir();
+        cacheDir = new CacheDir();
         cacheDir.size = cacheSize;
         cacheDir.subdirs = 1;
         properties.put(cacheStatus.getBaseDir(), cacheDir);

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1152941&r1=1152940&r2=1152941&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Mon Aug  1 21:41:20 2011
@@ -44,7 +44,6 @@ import org.apache.hadoop.mapred.TaskCont
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -608,6 +607,130 @@ public class TestTrackerDistributedCache
     return UserGroupInformation.getLoginUser().getUserName();
   }
 
+  public static final long CACHE_DELETE_PERIOD_MS = 100l;
+  
+  /** test delete cache */
+  public void testLRUDeleteCache() throws Exception {
+    if (!canRun()) {
+      return;
+    }
+    // This test needs MRConfig.LOCAL_DIR to be single directory
+    // instead of four, because it assumes that both 
+    // firstcachefile and secondcachefile will be localized on same directory 
+    // so that second localization triggers deleteCache.
+    // If MRConfig.LOCAL_DIR is four directories, second localization might not 
+    // trigger deleteCache, if it is localized in different directory.
+    Configuration conf2 = new Configuration(conf);
+    conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
+    //Make it larger then expected
+    conf2.setLong("local.cache.size", 21 * 1024l);
+    conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories", 3);
+    //The goal is to get down to 15.75K and 2 dirs
+    conf2.setFloat("mapreduce.tasktracker.cache.local.keep.pct", 0.75f); 
+    conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", CACHE_DELETE_PERIOD_MS);
+    refreshConf(conf2);
+    TrackerDistributedCacheManager manager = 
+      new TrackerDistributedCacheManager(conf2, taskController);
+    try {
+      manager.startCleanupThread();
+      FileSystem localfs = FileSystem.getLocal(conf2);
+      String userName = getJobOwnerName();
+      conf2.set("user.name", userName);
+
+      //Here we are testing the LRU.  In this case we will add in 4 cache entries
+      // 2 of them are 8k each and 2 of them are very small.  We want to verify
+      // That they are deleted in LRU order.
+      // So what we will do is add in the two large files first, 1 then 2, and
+      // then one of the small ones 3.  We will then release them in opposite
+      // order 3, 2, 1.
+      //
+      // Finally we will add in the last small file.  This last file should push
+      // us over the 3 entry limit to trigger a cleanup.  So LRU order is 3, 2, 1
+      // And we will only delete 2 entries so that should leave 1 un touched
+      // but 3 and 2 deleted
+
+      Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+      Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+      // Adding two more small files, so it triggers the number of sub directory
+      // limit but does not trigger the file size limit.
+      createTempFile(thirdCacheFile, 1);
+      createTempFile(fourthCacheFile, 1);
+
+      FileStatus stat = fs.getFileStatus(firstCacheFilePublic);
+      CacheFile cfile1 = new CacheFile(firstCacheFilePublic.toUri(), 
+          CacheFile.FileType.REGULAR, true, 
+          stat.getModificationTime(),
+          true); 
+
+      Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2, 
+          TaskTracker.getPrivateDistributedCacheDir(userName),
+          fs.getFileStatus(firstCacheFilePublic), false,
+          fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
+          cfile1);
+
+      stat = fs.getFileStatus(secondCacheFilePublic);
+      CacheFile cfile2 = new CacheFile(secondCacheFilePublic.toUri(), 
+          CacheFile.FileType.REGULAR, true, 
+          stat.getModificationTime(),
+          true); 
+
+      Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2, 
+          TaskTracker.getPrivateDistributedCacheDir(userName),
+          fs.getFileStatus(secondCacheFilePublic), false,
+          fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
+          cfile2);
+
+      stat = fs.getFileStatus(thirdCacheFile);
+      CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(), 
+          CacheFile.FileType.REGULAR, true, 
+          stat.getModificationTime(),
+          true); 
+
+      Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2, 
+          TaskTracker.getPrivateDistributedCacheDir(userName),
+          fs.getFileStatus(thirdCacheFile), false,
+          fs.getFileStatus(thirdCacheFile).getModificationTime(), true,
+          cfile3);
+
+      manager.releaseCache(cfile3.getStatus());
+      manager.releaseCache(cfile2.getStatus());
+      manager.releaseCache(cfile1.getStatus());
+
+      // Getting the fourth cache will make the number of sub directories becomes
+      // 4 which is greater than 3. So the released cache will be deleted.
+      stat = fs.getFileStatus(fourthCacheFile);
+      CacheFile cfile4 = new CacheFile(fourthCacheFile.toUri(), 
+          CacheFile.FileType.REGULAR, true, 
+          stat.getModificationTime(),
+          true); 
+
+      Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
+          TaskTracker.getPrivateDistributedCacheDir(userName),
+          fs.getFileStatus(fourthCacheFile), false,
+          fs.getFileStatus(fourthCacheFile).getModificationTime(), true,
+          cfile4);
+
+      checkCacheDeletion(localfs, secondLocalCache, "DistributedCache failed " +
+          "deleting second cache LRU order");
+
+      checkCacheDeletion(localfs, thirdLocalCache,
+          "DistributedCache failed deleting third" +
+      " cache LRU order.");
+
+      checkCacheNOTDeletion(localfs, firstLocalCache, "DistributedCache failed " +
+      "Deleted first cache LRU order.");
+
+      checkCacheNOTDeletion(localfs, fourthCacheFile, "DistributedCache failed " +
+      "Deleted fourth cache LRU order.");
+      // Clean up the files created in this test
+      new File(thirdCacheFile.toString()).delete();
+      new File(fourthCacheFile.toString()).delete();
+    } finally {
+      manager.stopCleanupThread();
+    }
+  }
+
+  
   /** test delete cache */
   public void testDeleteCache() throws Exception {
     if (!canRun()) {
@@ -622,7 +745,7 @@ public class TestTrackerDistributedCache
     Configuration conf2 = new Configuration(conf);
     conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
-    conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 200); // 200 ms
+    conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", CACHE_DELETE_PERIOD_MS);
     
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
@@ -762,6 +885,15 @@ public class TestTrackerDistributedCache
   }
   
   /**
+   * Do a simple check to see if the file has NOT been deleted.
+   */
+  private void checkCacheNOTDeletion(FileSystem fs, Path cache, String msg)
+  throws Exception {
+    TimeUnit.MILLISECONDS.sleep(3 * CACHE_DELETE_PERIOD_MS);
+    assertTrue(msg, fs.exists(cache));
+  }
+  
+  /**
    * Periodically checks if a file is there, return if the file is no longer
    * there. Fails the test if a files is there for 30 seconds.
    */
@@ -774,7 +906,7 @@ public class TestTrackerDistributedCache
         cacheExists = false;
         break;
       }
-      TimeUnit.MILLISECONDS.sleep(100L);
+      TimeUnit.MILLISECONDS.sleep(CACHE_DELETE_PERIOD_MS);
     }
     // If the cache is still there after 5 minutes, test fails.
     assertFalse(msg, cacheExists);