You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2010/04/17 00:11:37 UTC

svn commit: r935090 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/filecache/ src/java/org/apache/hadoop/mapreduce/server/tasktracker/ src/test/mapred/org/apache/hadoop/mapreduce/filecache/

Author: dhruba
Date: Fri Apr 16 22:11:36 2010
New Revision: 935090

URL: http://svn.apache.org/viewvc?rev=935090&view=rev
Log:
MAPREDUCE-1538. TrackerDistributedCacheManager manages the
number of files. (Scott Chen via dhruba)


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=935090&r1=935089&r2=935090&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Apr 16 22:11:36 2010
@@ -248,6 +248,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1466. Record number of files processed in FileInputFormat in the
     Configuration for offline analysis. (Luke Lu and Arun Murthy via cdouglas)
 
+    MAPREDUCE-1538. TrackerDistributedCacheManager manages the
+    number of files. (Scott Chen via dhruba)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=935090&r1=935089&r2=935090&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Fri Apr 16 22:11:36 2010
@@ -60,10 +60,19 @@ public class TrackerDistributedCacheMana
   private TreeMap<String, CacheStatus> cachedArchives = 
     new TreeMap<String, CacheStatus>();
 
-  private TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+  // For holding the properties of each cache directory
+  static class CacheDir {
+    long size;
+    long subdirs;
+  }
+  private TreeMap<Path, CacheDir> baseDirProperties =
+    new TreeMap<Path, CacheDir>();
 
   // default total cache size (10GB)
   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
+  private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
+  private long allowedCacheSize;
+  private long allowedCacheSubdirs;
 
   private static final Log LOG =
     LogFactory.getLog(TrackerDistributedCacheManager.class);
@@ -86,6 +95,12 @@ public class TrackerDistributedCacheMana
     this.trackerConf = conf;
     this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
     this.taskController = taskController;
+      // setting the cache size to a default of 10GB
+    this.allowedCacheSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE,
+          DEFAULT_CACHE_SIZE);
+      // setting the cache number of subdirectories limit to a default of 10000
+    this.allowedCacheSubdirs = conf.getLong(
+          TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, DEFAULT_CACHE_SUBDIR_LIMIT);
   }
 
   /**
@@ -174,21 +189,20 @@ public class TrackerDistributedCacheMana
 
       // try deleting stuff if you can
       long size = 0;
+      long numberSubdirs = 0;
       synchronized (lcacheStatus) {
-        synchronized (baseDirSize) {
-          Long get = baseDirSize.get(lcacheStatus.getBaseDir());
-          if (get != null) {
-            size = get.longValue();
+        synchronized (baseDirProperties) {
+          CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
+          if (cacheDir != null) {
+            size = cacheDir.size;
+            numberSubdirs = cacheDir.subdirs;
           } else {
-            LOG.warn("Cannot find size of baseDir: "
-                + lcacheStatus.getBaseDir());
+            LOG.warn("Cannot find size and number of subdirectories of" +
+                     " baseDir: " + lcacheStatus.getBaseDir());
           }
         }
       }
-      // setting the cache size to a default of 10GB
-      long allowedSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE,
-          DEFAULT_CACHE_SIZE);
-      if (allowedSize < size) {
+      if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
         // try some cache deletions
         deleteCache(conf);
       }
@@ -268,18 +282,9 @@ public class TrackerDistributedCacheMana
     for (CacheStatus lcacheStatus : deleteSet) {
       synchronized (lcacheStatus) {
         deleteLocalPath(asyncDiskService,
-            FileSystem.getLocal(conf), lcacheStatus.localizedLoadPath);
-        // decrement the size of the cache from baseDirSize
-        synchronized (baseDirSize) {
-          Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
-          if ( dirSize != null ) {
-            dirSize -= lcacheStatus.size;
-            baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
-          } else {
-            LOG.warn("Cannot find record of the baseDir: " + 
-                     lcacheStatus.localizedBaseDir + " during delete!");
-          }
-        }
+            FileSystem.getLocal(conf), lcacheStatus.getLocalizedUniqueDir());
+        // Update the maps baseDirSize and baseDirNumberSubDir
+        deleteCacheInfoUpdate(lcacheStatus);
       }
     }
   }
@@ -498,15 +503,9 @@ public class TrackerDistributedCacheMana
     long cacheSize = 
       FileUtil.getDU(new File(parchive.getParent().toString()));
     cacheStatus.size = cacheSize;
-    synchronized (baseDirSize) {
-      Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
-      if( dirSize == null ) {
-        dirSize = Long.valueOf(cacheSize);
-      } else {
-        dirSize += cacheSize;
-      }
-      baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
-    }
+    // Increase the size and sub directory count of the cache
+    // from baseDirSize and baseDirNumberSubDir.
+    addCacheInfoUpdate(cacheStatus);
 
     // set proper permissions for the localized directory
     setPermissions(conf, cacheStatus, isPublic);
@@ -924,4 +923,50 @@ public class TrackerDistributedCacheMana
   static void setLocalFiles(Configuration conf, String str) {
     conf.set(JobContext.CACHE_LOCALFILES, str);
   }
+  
+  /**
+   * Decrement the size and sub directory count of the cache from baseDirSize
+   * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
+   * @param cacheStatus cache status of the cache is deleted
+   */
+  private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
+    if (!cacheStatus.inited) {
+      // if it is not created yet, do nothing.
+      return;
+    }
+    // decrement the size of the cache from baseDirSize
+    synchronized (baseDirProperties) {
+      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+      if (cacheDir != null) {
+        cacheDir.size -= cacheStatus.size;
+        cacheDir.subdirs--;
+      } else {
+        LOG.warn("Cannot find size and number of subdirectories of" +
+                 " baseDir: " + cacheStatus.getBaseDir());
+      }
+    }
+  }
+  
+  /**
+   * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
+   * Increase the size and sub directory count of the cache from baseDirSize
+   * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
+   * @param cacheStatus cache status of the cache is added
+   */
+  private void addCacheInfoUpdate(CacheStatus cacheStatus) {
+    long cacheSize = cacheStatus.size;
+    // decrement the size of the cache from baseDirSize
+    synchronized (baseDirProperties) {
+      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+      if (cacheDir != null) {
+        cacheDir.size += cacheSize;
+        cacheDir.subdirs++;
+      } else {
+        cacheDir = new CacheDir();
+        cacheDir.size = cacheSize;
+        cacheDir.subdirs = 1;
+        baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+      }
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=935090&r1=935089&r2=935090&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Fri Apr 16 22:11:36 2010
@@ -81,6 +81,8 @@ public interface TTConfig extends MRConf
     "mapreduce.tasktracker.taskmemorymanager.monitoringinterval";
   public static final String TT_LOCAL_CACHE_SIZE = 
     "mapreduce.tasktracker.cache.local.size";
+  public static final String TT_LOCAL_CACHE_SUBDIRS_LIMIT =
+    "mapreduce.tasktracker.cache.local.numberdirectories";
   public static final String TT_OUTOFBAND_HEARBEAT =
     "mapreduce.tasktracker.outofband.heartbeat";
   public static final String TT_USER_NAME = "mapreduce.tasktracker.user.name";

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=935090&r1=935089&r2=935090&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Fri Apr 16 22:11:36 2010
@@ -66,6 +66,7 @@ public class TestTrackerDistributedCache
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
   private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
+  private static final int LOCAL_CACHE_SUBDIR = 2;
   protected Configuration conf;
   protected Path firstCacheFile;
   protected Path secondCacheFile;
@@ -439,6 +440,7 @@ public class TestTrackerDistributedCache
     Configuration conf2 = new Configuration(conf);
     conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
+    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf2, taskController);
@@ -447,6 +449,7 @@ public class TestTrackerDistributedCache
     String userName = getJobOwnerName();
     conf2.set(JobContext.USER_NAME, userName);
 
+    // We first test the size limit
     Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
@@ -463,6 +466,33 @@ public class TestTrackerDistributedCache
     assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
         localfs.exists(localCache));
+    
+    // Now we test the number of sub directories limit
+    // Create the temporary cache files to be used in the tests.
+    Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+    Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+    // Adding two more small files, so it triggers the number of sub directory
+    // limit but does not trigger the file size limit.
+    createTempFile(thirdCacheFile, 1);
+    createTempFile(fourthCacheFile, 1);
+    Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(thirdCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false, false);
+    // Release the third cache so that it can be deleted while sweeping
+    manager.releaseCache(thirdCacheFile.toUri(), conf2, now);
+    // Getting the fourth cache will make the number of sub directories becomes
+    // 3 which is greater than 2. So the released cache will be deleted.
+    manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(fourthCacheFile), false, 
+        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+    assertFalse("DistributedCache failed deleting old" + 
+        " cache when the cache exceeds the number of sub directories limit.",
+        localfs.exists(thirdLocalCache));
+    // Clean up the files created in this test
+    new File(thirdCacheFile.toString()).delete();
+    new File(fourthCacheFile.toString()).delete();
   }
   
   public void testFileSystemOtherThanDefault() throws Exception {
@@ -486,13 +516,17 @@ public class TestTrackerDistributedCache
   }
 
   static void createTempFile(Path p) throws IOException {
+    createTempFile(p, TEST_FILE_SIZE);
+  }
+  
+  static void createTempFile(Path p, int size) throws IOException {
     File f = new File(p.toString());
     FileOutputStream os = new FileOutputStream(f);
-    byte[] toWrite = new byte[TEST_FILE_SIZE];
+    byte[] toWrite = new byte[size];
     new Random().nextBytes(toWrite);
     os.write(toWrite);
     os.close();
-    FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
+    FileSystem.LOG.info("created: " + p + ", size=" + size);
   }
   
   static void createPublicTempFile(Path p)