You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by zs...@apache.org on 2010/04/30 00:42:35 UTC

svn commit: r939505 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/filecache/ src/java/org/apache/hadoop/mapreduce/server/tasktracker/ src/test/mapred/org/apache/hadoop/mapreduce/filecache/

Author: zshao
Date: Thu Apr 29 22:42:35 2010
New Revision: 939505

URL: http://svn.apache.org/viewvc?rev=939505&view=rev
Log:
MAPREDUCE-1568. TrackerDistributedCacheManager should clean up cache in a background thread. (Scott Chen via zshao)

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Apr 29 22:42:35 2010
@@ -278,6 +278,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1417. Forrest documentation should be updated to reflect 
     the changes in MAPREDUCE-744. (Ravi Gummadi via vinodkv)
 
+    MAPREDUCE-1568. TrackerDistributedCacheManager should clean up cache
+    in a background thread. (Scott Chen via zshao)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Apr 29 22:42:35 2010
@@ -691,6 +691,7 @@ public class TaskTracker 
     this.distributedCacheManager = 
         new TrackerDistributedCacheManager(this.fConf, taskController,
         asyncDiskService);
+    this.distributedCacheManager.startCleanupThread();
 
     this.jobClient = (InterTrackerProtocol) 
     mrOwner.doAs(new PrivilegedExceptionAction<Object>() {
@@ -1218,6 +1219,7 @@ public class TaskTracker 
     this.mapLauncher.interrupt();
     this.reduceLauncher.interrupt();
     
+    this.distributedCacheManager.stopCleanupThread();
     jvmManager.stop();
     
     // shutdown RPC connections

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Apr 29 22:42:35 2010
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.file
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -60,14 +62,6 @@ public class TrackerDistributedCacheMana
   private TreeMap<String, CacheStatus> cachedArchives = 
     new TreeMap<String, CacheStatus>();
 
-  // For holding the properties of each cache directory
-  static class CacheDir {
-    long size;
-    long subdirs;
-  }
-  private TreeMap<Path, CacheDir> baseDirProperties =
-    new TreeMap<Path, CacheDir>();
-
   // default total cache size (10GB)
   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
   private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
@@ -88,7 +82,10 @@ public class TrackerDistributedCacheMana
   private Random random = new Random();
 
   private MRAsyncDiskService asyncDiskService;
-  
+
+  BaseDirManager baseDirManager = new BaseDirManager();
+  CleanupThread cleanupThread;
+
   public TrackerDistributedCacheManager(Configuration conf,
       TaskController taskController) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
@@ -101,6 +98,7 @@ public class TrackerDistributedCacheMana
       // setting the cache number of subdirectories limit to a default of 10000
     this.allowedCacheSubdirs = conf.getLong(
           TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, DEFAULT_CACHE_SUBDIR_LIMIT);
+    this.cleanupThread = new CleanupThread(conf);
   }
 
   /**
@@ -186,26 +184,6 @@ public class TrackerDistributedCacheMana
         createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
             honorSymLinkConf);
       }
-
-      // try deleting stuff if you can
-      long size = 0;
-      long numberSubdirs = 0;
-      synchronized (lcacheStatus) {
-        synchronized (baseDirProperties) {
-          CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
-          if (cacheDir != null) {
-            size = cacheDir.size;
-            numberSubdirs = cacheDir.subdirs;
-          } else {
-            LOG.warn("Cannot find size and number of subdirectories of" +
-                     " baseDir: " + lcacheStatus.getBaseDir());
-          }
-        }
-      }
-      if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
-        // try some cache deletions
-        deleteCache(conf);
-      }
       initSuccessful = true;
       return localizedPath;
     } finally {
@@ -256,39 +234,6 @@ public class TrackerDistributedCacheMana
     }
   }
 
-  // To delete the caches which have a refcount of zero
-
-  private void deleteCache(Configuration conf) throws IOException {
-    Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
-    // try deleting cache Status with refcount of zero
-    synchronized (cachedArchives) {
-      for (Iterator<String> it = cachedArchives.keySet().iterator(); 
-          it.hasNext();) {
-        String cacheId = it.next();
-        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        
-        // if reference count is zero 
-        // mark the cache for deletion
-        if (lcacheStatus.refcount == 0) {
-          // delete this cache entry from the global list 
-          // and mark the localized file for deletion
-          deleteSet.add(lcacheStatus);
-          it.remove();
-        }
-      }
-    }
-    
-    // do the deletion, after releasing the global lock
-    for (CacheStatus lcacheStatus : deleteSet) {
-      synchronized (lcacheStatus) {
-        deleteLocalPath(asyncDiskService,
-            FileSystem.getLocal(conf), lcacheStatus.getLocalizedUniqueDir());
-        // Update the maps baseDirSize and baseDirNumberSubDir
-        deleteCacheInfoUpdate(lcacheStatus);
-      }
-    }
-  }
-
   /**
    * Delete a local path with asyncDiskService if available,
    * or otherwise synchronously with local file system.
@@ -505,7 +450,7 @@ public class TrackerDistributedCacheMana
     cacheStatus.size = cacheSize;
     // Increase the size and sub directory count of the cache
     // from baseDirSize and baseDirNumberSubDir.
-    addCacheInfoUpdate(cacheStatus);
+    baseDirManager.addCacheUpdate(cacheStatus);
 
     // set proper permissions for the localized directory
     setPermissions(conf, cacheStatus, isPublic);
@@ -606,30 +551,31 @@ public class TrackerDistributedCacheMana
   }
 
   static class CacheStatus {
-    // the local load path of this cache
-    Path localizedLoadPath;
-
-    //the base dir where the cache lies
-    Path localizedBaseDir;
-
-    //the size of this cache
-    long size;
-
-    // number of instances using this cache
-    int refcount;
-
-    // the cache-file modification time
-    long mtime;
-
-    // is it initialized ?
-    boolean inited = false;
-
+    //
+    // This field should be accessed under global cachedArchives lock.
+    //
+    int refcount;           // number of instances using this cache.
+
+    //
+    // The following three fields should be accessed under
+    // individual cacheStatus lock.
+    //
+    long size;              //the size of this cache.
+    long mtime;             // the cache-file modification time
+    boolean inited = false; // is it initialized ?
+
+    //
+    // The following four fields are Immutable.
+    //
     // The sub directory (tasktracker/archive or tasktracker/user/archive),
     // under which the file will be localized
-    Path subDir;
-    
+    final Path subDir;
     // unique string used in the construction of local load path
-    String uniqueString;
+    final String uniqueString;
+    // the local load path of this cache
+    final Path localizedLoadPath;
+    //the base dir where the cache lies
+    final Path localizedBaseDir;
 
     public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
         String uniqueString) {
@@ -925,48 +871,154 @@ public class TrackerDistributedCacheMana
   }
   
   /**
-   * Decrement the size and sub directory count of the cache from baseDirSize
-   * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
-   * @param cacheStatus cache status of the cache is deleted
-   */
-  private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
-    if (!cacheStatus.inited) {
-      // if it is not created yet, do nothing.
-      return;
-    }
-    // decrement the size of the cache from baseDirSize
-    synchronized (baseDirProperties) {
-      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
-      if (cacheDir != null) {
-        cacheDir.size -= cacheStatus.size;
-        cacheDir.subdirs--;
-      } else {
-        LOG.warn("Cannot find size and number of subdirectories of" +
-                 " baseDir: " + cacheStatus.getBaseDir());
+   * A thread to check and cleanup the unused files periodically
+   */
+  private class CleanupThread extends Thread {
+    // How often do we check if we need to clean up cache files?
+    private long cleanUpCheckPeriod = 60000L; // 1 minute
+    public CleanupThread(Configuration conf) {
+      cleanUpCheckPeriod =
+        conf.getLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD,
+                            cleanUpCheckPeriod);
+    }
+    private volatile boolean running = true;
+    public void stopRunning() {
+      running = false;
+    }
+    @Override
+    public void run() {
+      while (running) {
+        try {
+          Thread.sleep(cleanUpCheckPeriod);
+          baseDirManager.checkAndCleanup();
+        } catch (Exception e) {
+          LOG.error("Exception in DistributedCache CleanupThread.", e);
+          // This thread should keep running and never crash.
+        }
       }
     }
   }
-  
+
   /**
-   * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
-   * Increase the size and sub directory count of the cache from baseDirSize
-   * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
-   * @param cacheStatus cache status of the cache is added
+   * This class holds properties of each base directories and is responsible
+   * for clean up unused cache files in base directories.
    */
-  private void addCacheInfoUpdate(CacheStatus cacheStatus) {
-    long cacheSize = cacheStatus.size;
-    // decrement the size of the cache from baseDirSize
-    synchronized (baseDirProperties) {
-      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
-      if (cacheDir != null) {
-        cacheDir.size += cacheSize;
-        cacheDir.subdirs++;
-      } else {
-        cacheDir = new CacheDir();
-        cacheDir.size = cacheSize;
-        cacheDir.subdirs = 1;
-        baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+  private class BaseDirManager {
+    private class CacheDir {
+      long size;
+      long subdirs;
+    }
+    private TreeMap<Path, CacheDir> properties =
+      new TreeMap<Path, CacheDir>();
+
+    private long getDirSize(Path p) {
+      return properties.get(p).size;
+    }
+    private long getDirSubdirs(Path p) {
+      return properties.get(p).subdirs;
+    }
+
+    /**
+     * Check each base directory to see if the size or number of subdirectories
+     * are exceed the limit. If the limit is exceeded, start deleting caches
+     * with zero reference count. This method synchronizes cachedArchives.
+     */
+    public void checkAndCleanup() throws IOException {
+      Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
+      Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+      synchronized (properties) {
+        for (Path baseDir : properties.keySet()) {
+          if (allowedCacheSize < getDirSize(baseDir) ||
+              allowedCacheSubdirs < getDirSubdirs(baseDir)) {
+            toBeCleanedBaseDir.add(baseDir);
+          }
+        }
+      }
+      synchronized (cachedArchives) {
+        for (Iterator<String> it = cachedArchives.keySet().iterator();
+        it.hasNext();) {
+          String cacheId = it.next();
+          CacheStatus cacheStatus = cachedArchives.get(cacheId);
+          if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+            // if reference count is zero mark the cache for deletion
+            if (cacheStatus.refcount == 0) {
+              // delete this cache entry from the global list 
+              // and mark the localized file for deletion
+              toBeDeletedCache.add(cacheStatus);
+              it.remove();
+            }
+          }
+        }
+      }
+      // do the deletion, after releasing the global lock
+      for (CacheStatus cacheStatus : toBeDeletedCache) {
+        synchronized (cacheStatus) {
+          deleteLocalPath(asyncDiskService, FileSystem.getLocal(trackerConf),
+                          cacheStatus.getLocalizedUniqueDir());
+          // Update the maps baseDirSize and baseDirNumberSubDir
+          deleteCacheUpdate(cacheStatus);
+        }
+      }
+    }
+    /**
+     * Decrement the size and sub directory count of the cache from baseDirSize
+     * and baseDirNumberSubDir. Have to synchronize cacheStatus before calling
+     * this method
+     * @param cacheStatus cache status of the cache is deleted
+     */
+    public void deleteCacheUpdate(CacheStatus cacheStatus) {
+      if (!cacheStatus.inited) {
+        // if it is not created yet, do nothing.
+        return;
+      }
+      synchronized (properties) {
+        CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
+        if (cacheDir != null) {
+          cacheDir.size -= cacheStatus.size;
+          cacheDir.subdirs--;
+        } else {
+          LOG.warn("Cannot find size and number of subdirectories of" +
+              " baseDir: " + cacheStatus.getBaseDir());
+        }
+      }
+    }
+
+    /**
+     * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
+     * Increase the size and sub directory count of the cache from baseDirSize
+     * and baseDirNumberSubDir. Have to synchronize cacheStatus before calling
+     * this method.
+     * @param cacheStatus cache status of the cache is added
+     */
+    public void addCacheUpdate(CacheStatus cacheStatus) {
+      long cacheSize = cacheStatus.size;
+      synchronized (properties) {
+        CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
+        if (cacheDir != null) {
+          cacheDir.size += cacheSize;
+          cacheDir.subdirs++;
+        } else {
+          cacheDir = new CacheDir();
+          cacheDir.size = cacheSize;
+          cacheDir.subdirs = 1;
+          properties.put(cacheStatus.getBaseDir(), cacheDir);
+        }
       }
     }
   }
+
+  /**
+   * Start the background thread
+   */
+  public void startCleanupThread() {
+    this.cleanupThread.start();
+  }
+
+  /**
+   * Stop the background thread
+   */
+  public void stopCleanupThread() {
+    cleanupThread.stopRunning();
+    cleanupThread.interrupt();
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Thu Apr 29 22:42:35 2010
@@ -94,4 +94,6 @@ public interface TTConfig extends MRConf
     "mapreduce.tasktracker.group";
   public static final String TT_USERLOGCLEANUP_SLEEPTIME = 
     "mapreduce.tasktracker.userlogcleanup.sleeptime";
+  public static final String TT_DISTRIBUTED_CACHE_CHECK_PERIOD =
+    "mapreduce.tasktracker.distributedcache.checkperiod";
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Apr 29 22:42:35 2010
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import javax.security.auth.login.LoginException;
 
@@ -441,9 +442,11 @@ public class TestTrackerDistributedCache
     conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
     conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
+    conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf2, taskController);
+    manager.startCleanupThread();
     FileSystem localfs = FileSystem.getLocal(conf2);
     long now = System.currentTimeMillis();
     String userName = getJobOwnerName();
@@ -463,10 +466,8 @@ public class TestTrackerDistributedCache
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(secondCacheFile), false, 
         System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
-    assertFalse("DistributedCache failed deleting old" + 
-        " cache when the cache store is full.",
-        localfs.exists(localCache));
-    
+    checkCacheDeletion(localfs, localCache, "DistributedCache failed " +
+        "deleting old cache when the cache store is full.");
     // Now we test the number of sub directories limit
     // Create the temporary cache files to be used in the tests.
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -487,12 +488,32 @@ public class TestTrackerDistributedCache
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
         System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
-    assertFalse("DistributedCache failed deleting old" + 
-        " cache when the cache exceeds the number of sub directories limit.",
-        localfs.exists(thirdLocalCache));
+    checkCacheDeletion(localfs, thirdLocalCache,
+        "DistributedCache failed deleting old" +
+        " cache when the cache exceeds the number of sub directories limit.");
     // Clean up the files created in this test
     new File(thirdCacheFile.toString()).delete();
     new File(fourthCacheFile.toString()).delete();
+    manager.stopCleanupThread();
+  }
+
+  /**
+   * Periodically checks if a file is there, return if the file is no longer
+   * there. Fails the test if a files is there for 30 seconds.
+   */
+  private void checkCacheDeletion(FileSystem fs, Path cache, String msg)
+    throws Exception {
+    // Check every 100ms to see if the cache is deleted
+    boolean cacheExists = true;
+    for (int i = 0; i < 300; i++) {
+      if (!fs.exists(cache)) {
+        cacheExists = false;
+        break;
+      }
+      TimeUnit.MILLISECONDS.sleep(100L);
+    }
+    // If the cache is still there after 5 minutes, test fails.
+    assertFalse(msg, cacheExists);
   }
   
   public void testFileSystemOtherThanDefault() throws Exception {