You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/06/17 00:57:43 UTC

svn commit: r1136726 - in /hadoop/common/branches/branch-0.20-security-204: ./ src/mapred/org/apache/hadoop/filecache/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/filecache/

Author: omalley
Date: Thu Jun 16 22:57:43 2011
New Revision: 1136726

URL: http://svn.apache.org/viewvc?rev=1136726&view=rev
Log:
MAPREDUCE-2479. Move distributed cache cleanup to a background task,
backporting MAPREDUCE-1568. (Robert Joseph Evans via cdouglas)

Modified:
    hadoop/common/branches/branch-0.20-security-204/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/CHANGES.txt?rev=1136726&r1=1136725&r2=1136726&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-204/CHANGES.txt Thu Jun 16 22:57:43 2011
@@ -34,6 +34,9 @@ Release 0.20.204.0 - unreleased
 
     HADOOP-7369. Fix permissions in tarball for sbin/* and libexec/* (omalley)
 
+    MAPREDUCE-2479. Move distributed cache cleanup to a background task,
+    backporting MAPREDUCE-1568. (Robert Joseph Evans via cdouglas)
+
     HADOOP-7356. Fix bin/hadoop scripts (eyang via omalley)
 
     HADOOP-7272. Remove unnecessary security related info logs. (suresh)

Propchange: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 16 22:57:43 2011
@@ -1,5 +1,5 @@
 /hadoop/common/branches/branch-0.20/CHANGES.txt:826138,826568,829987,831184,833001,880632,898713,909245,909723,960946,1044225
-/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1125139,1125170,1125587,1125589,1127362,1131277,1131286,1131290,1131299,1131737,1134140
+/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1103940,1125139,1125170,1125587,1125589,1127362,1131277,1131286,1131290,1131299,1131737,1134140
 /hadoop/common/branches/branch-0.20-security-203/CHANGES.txt:1096071,1097012-1099333,1102071,1128115
 /hadoop/common/branches/branch-0.20-security-205/CHANGES.txt:1132788,1133133,1133274,1133282
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226

Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1136726&r1=1136725&r2=1136726&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Thu Jun 16 22:57:43 2011
@@ -22,12 +22,12 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.DateFormat;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
@@ -76,14 +76,6 @@ public class TrackerDistributedCacheMana
   private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
     FsPermission.createImmutable((short) 0755);
 
-  // For holding the properties of each cache directory
-  static class CacheDir {
-    long size;
-    long subdirs;
-  }
-  private TreeMap<Path, CacheDir> baseDirProperties =
-    new TreeMap<Path, CacheDir>();
-
   // default total cache size (10GB)
   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
   private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
@@ -101,8 +93,8 @@ public class TrackerDistributedCacheMana
   
   private static final Random random = new Random();
   
-  protected BaseDirManager baseDirManager = new BaseDirManager();
-  protected CleanupThread cleanupThread;
+  BaseDirManager baseDirManager = new BaseDirManager();
+  CleanupThread cleanupThread;
 
   public TrackerDistributedCacheManager(Configuration conf,
                                         TaskController controller
@@ -119,6 +111,7 @@ public class TrackerDistributedCacheMana
       ("mapreduce.tasktracker.local.cache.numberdirectories",
        DEFAULT_CACHE_SUBDIR_LIMIT);
     this.taskController = controller;
+    this.cleanupThread = new CleanupThread(conf);
   }
 
   /**
@@ -200,7 +193,7 @@ public class TrackerDistributedCacheMana
 
               // Increase the size and sub directory count of the cache
               // from baseDirSize and baseDirNumberSubDir.
-              addCacheInfoUpdate(lcacheStatus);
+              baseDirManager.addCacheInfoUpdate(lcacheStatus);
             }
           }
           lcacheStatus.initComplete();
@@ -209,27 +202,6 @@ public class TrackerDistributedCacheMana
                                                    lcacheStatus, fileStatus, isArchive);            
         }
       }
-
-      // try deleting stuff if you can
-      long size = 0;
-      long numberSubdirs = 0;
-      synchronized (lcacheStatus) {
-        synchronized (baseDirProperties) {
-          CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
-          if (cacheDir != null) {
-            size = cacheDir.size;
-            numberSubdirs = cacheDir.subdirs;
-          } else {
-            LOG.warn("Cannot find size and number of subdirectories of" +
-                     " baseDir: " + lcacheStatus.getBaseDir());
-          }
-        }
-      }
-
-      if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
-        // try some cache deletions
-        compactCache(conf);
-      }
     } catch (IOException ie) {
       synchronized (lcacheStatus) {
         // release this cache
@@ -260,7 +232,7 @@ public class TrackerDistributedCacheMana
     if (size != 0) {
       synchronized (status) {
         status.size = size;
-        addCacheInfoUpdate(status);
+        baseDirManager.addCacheInfoUpdate(status);
       }
     }
   }
@@ -292,54 +264,6 @@ public class TrackerDistributedCacheMana
     return user;
   }
 
-
-  // To delete the caches which have a refcount of zero
-
-  private void compactCache(Configuration conf) throws IOException {
-    List<CacheStatus> deleteList = new LinkedList<CacheStatus>();
-    // try deleting cache Status with refcount of zero
-    synchronized (cachedArchives) {
-      for (Iterator<String> it = cachedArchives.keySet().iterator(); 
-          it.hasNext();) {
-        String cacheId = it.next();
-        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        // if reference count is zero 
-        // mark the cache for deletion
-        if (lcacheStatus.refcount == 0) {
-          // delete this cache entry from the global list 
-          // and mark the localized file for deletion
-          deleteList.add(lcacheStatus);
-          it.remove();
-        }
-      }
-    }
-    
-    // do the deletion, after releasing the global lock
-    for (CacheStatus lcacheStatus : deleteList) {
-      synchronized (lcacheStatus) {
-        Path potentialDeletee = lcacheStatus.localizedLoadPath;
-        Path localizedDir = lcacheStatus.getLocalizedUniqueDir();
-        if (lcacheStatus.user == null) {
-          LOG.info("Deleted path " + localizedDir);
-          try {
-            localFs.delete(localizedDir, true);
-          } catch (IOException e) {
-            LOG.warn("Could not delete distributed cache empty directory "
-                     + localizedDir, e);
-          }
-        } else {         
-          LOG.info("Deleted path " + localizedDir + " as " + lcacheStatus.user);
-          String base = lcacheStatus.getBaseDir().toString();
-          String userDir = TaskTracker.getUserDir(lcacheStatus.user);
-          int skip = base.length() + 1 + userDir.length() + 1;
-          String relative = localizedDir.toString().substring(skip);
-          taskController.deleteAsUser(lcacheStatus.user, relative);
-        }
-        deleteCacheInfoUpdate(lcacheStatus);
-      }
-    }
-  }
-
   /*
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
@@ -541,7 +465,7 @@ public class TrackerDistributedCacheMana
     
     // Increase the size and sub directory count of the cache
     // from baseDirSize and baseDirNumberSubDir.
-    addCacheInfoUpdate(cacheStatus);
+    baseDirManager.addCacheInfoUpdate(cacheStatus);
 
     LOG.info(String.format("Cached %s as %s",
              cache.toString(), cacheStatus.localizedLoadPath));
@@ -617,28 +541,31 @@ public class TrackerDistributedCacheMana
   }
 
   static class CacheStatus {
-    // the local load path of this cache
-    Path localizedLoadPath;
-
-    //the base dir where the cache lies
-    Path localizedBaseDir;
-
-    //the size of this cache
-    long size;
-
-    // number of instances using this cache
-    int refcount;
-
-    // is it initialized ?
-    boolean inited = false;
-
+    //
+    // This field should be accessed under global cachedArchives lock.
+    //
+    int refcount;    // number of instances using this cache
+
+    //
+    // The following two fields should be accessed under
+    // individual cacheStatus lock.
+    //
+    long size;              //the size of this cache.
+    boolean inited = false; // is it initialized ?
+    
+    //
+    // The following five fields are Immutable.
+    //
+    
     // The sub directory (tasktracker/archive or tasktracker/user/archive),
     // under which the file will be localized
     Path subDir;
-    
     // unique string used in the construction of local load path
     String uniqueString;
-    
+    // the local load path of this cache
+    Path localizedLoadPath;
+    //the base dir where the cache lies
+    Path localizedBaseDir;
     // The user that owns the cache entry or null if it is public
     final String user;
 
@@ -943,10 +870,11 @@ public class TrackerDistributedCacheMana
     return path;
   }
   
+  
   /**
    * A thread to check and cleanup the unused files periodically
    */
-  protected class CleanupThread extends Thread {
+  private class CleanupThread extends Thread {
     // How often do we check if we need to clean up cache files?
     private long cleanUpCheckPeriod = 60000L; // 1 minute
     public CleanupThread(Configuration conf) {
@@ -954,7 +882,6 @@ public class TrackerDistributedCacheMana
         conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod",
             cleanUpCheckPeriod);
     }
-
     private volatile boolean running = true;
     
     public void stopRunning() {
@@ -967,33 +894,19 @@ public class TrackerDistributedCacheMana
         try {
           Thread.sleep(cleanUpCheckPeriod);
           baseDirManager.checkAndCleanup();
-        } catch (IOException e) {
+        } catch (Exception e) {
           LOG.error("Exception in DistributedCache CleanupThread.", e);
-        } catch(InterruptedException e) {
-          LOG.info("Cleanup...",e); 
-          //To force us to exit cleanly
-          running = false;
-        } catch (Throwable t) {
-          exitTaskTracker(t);
+          // This thread should keep running and never crash.
         }
       }
     }
-    
-    /**
-     * Exit the task tracker because of a fatal error.
-     */
-    protected void exitTaskTracker(Throwable t) {
-      LOG.fatal("Distributed Cache cleanup thread received runtime exception." +
-      		" Exiting the TaskTracker", t);
-      Runtime.getRuntime().exit(-1);
-    }
   }
 
   /**
    * This class holds properties of each base directories and is responsible
    * for clean up unused cache files in base directories.
    */
-  protected class BaseDirManager {
+  private class BaseDirManager {
 
     // For holding the properties of each cache directory
     private class CacheDir {
@@ -1072,44 +985,60 @@ public class TrackerDistributedCacheMana
    * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
    * @param cacheStatus cache status of the cache is deleted
    */
-  private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
+  public void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
     if (!cacheStatus.inited) {
       // if it is not created yet, do nothing.
       return;
     }
     // decrement the size of the cache from baseDirSize
-    synchronized (baseDirProperties) {
-      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+    synchronized (baseDirManager.properties) {
+      BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
       if (cacheDir != null) {
         cacheDir.size -= cacheStatus.size;
         cacheDir.subdirs--;
       } else {
         LOG.warn("Cannot find size and number of subdirectories of" +
-                 " baseDir: " + cacheStatus.getBaseDir());
+            " baseDir: " + cacheStatus.getBaseDir());
       }
     }
   }
-  
+
   /**
    * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
    * Increase the size and sub directory count of the cache from baseDirSize
    * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
    * @param cacheStatus cache status of the cache is added
    */
-  private void addCacheInfoUpdate(CacheStatus cacheStatus) {
+  public void addCacheInfoUpdate(CacheStatus cacheStatus) {
     long cacheSize = cacheStatus.size;
     // decrement the size of the cache from baseDirSize
-    synchronized (baseDirProperties) {
-      CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+    synchronized (baseDirManager.properties) {
+      BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
       if (cacheDir != null) {
         cacheDir.size += cacheSize;
         cacheDir.subdirs++;
       } else {
-        cacheDir = new CacheDir();
+        cacheDir = new BaseDirManager.CacheDir();
         cacheDir.size = cacheSize;
         cacheDir.subdirs = 1;
-        baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+        properties.put(cacheStatus.getBaseDir(), cacheDir);
       }
     }
   }
+  }
+  
+  /**
+   * Start the background thread
+   */
+  public void startCleanupThread() {
+    this.cleanupThread.start();
+  }
+
+  /**
+   * Stop the background thread
+   */
+  public void stopCleanupThread() {
+    cleanupThread.stopRunning();
+    cleanupThread.interrupt();
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1136726&r1=1136725&r2=1136726&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Jun 16 22:57:43 2011
@@ -812,7 +812,8 @@ public class TaskTracker implements MRCo
     // Initialize DistributedCache
     this.distributedCacheManager = new TrackerDistributedCacheManager(
         this.fConf, taskController);
-
+    this.distributedCacheManager.startCleanupThread();
+    
     this.jobClient = (InterTrackerProtocol) 
     UserGroupInformation.getLoginUser().doAs(
         new PrivilegedExceptionAction<Object>() {
@@ -1365,6 +1366,7 @@ public class TaskTracker implements MRCo
     this.mapLauncher.interrupt();
     this.reduceLauncher.interrupt();
 
+    this.distributedCacheManager.stopCleanupThread();
     jvmManager.stop();
     
     // shutdown RPC connections

Modified: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1136726&r1=1136725&r2=1136726&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Thu Jun 16 22:57:43 2011
@@ -622,10 +622,13 @@ public class TestTrackerDistributedCache
     Configuration conf2 = new Configuration(conf);
     conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 200); // 200 ms
     
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf2, taskController);
+    manager.startCleanupThread();
+    try {
     FileSystem localfs = FileSystem.getLocal(conf2);
     long now = System.currentTimeMillis();
     String userName = getJobOwnerName();
@@ -659,9 +662,9 @@ public class TestTrackerDistributedCache
         fs.getFileStatus(secondCacheFilePublic), false, 
         fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
         cfile2);
-    assertFalse("DistributedCache failed deleting old" + 
-        " cache when the cache store is full.",
-        localfs.exists(firstLocalCache));
+    checkCacheDeletion(localfs, firstLocalCache,
+        "DistributedCache failed deleting old" +
+        " cache when the cache store is full");
 
     // find the root directory of distributed caches
     Path firstCursor = firstLocalCache;
@@ -691,8 +694,12 @@ public class TestTrackerDistributedCache
     conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT * 10);
     conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories",
         LOCAL_CACHE_SUBDIR_LIMIT);
+    manager.stopCleanupThread();
+    
     manager = 
       new TrackerDistributedCacheManager(conf2, taskController);
+    manager.startCleanupThread();
+    
     // Now we test the number of sub directories limit
     // Create the temporary cache files to be used in the tests.
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -736,9 +743,9 @@ public class TestTrackerDistributedCache
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
         fs.getFileStatus(fourthCacheFile).getModificationTime(), false, cfile4);
-    assertFalse("DistributedCache failed deleting old" + 
-        " cache when the cache exceeds the number of sub directories limit.",
-        localfs.exists(thirdLocalCache));
+    checkCacheDeletion(localfs, thirdLocalCache,
+        "DistributedCache failed deleting old" +
+        " cache when the cache exceeds the number of sub directories limit.");
 
     assertFalse
       ("DistributedCache did not delete the gensym'ed distcache "
@@ -749,8 +756,30 @@ public class TestTrackerDistributedCache
     // Clean up the files created in this test
     new File(thirdCacheFile.toString()).delete();
     new File(fourthCacheFile.toString()).delete();
+    } finally {
+      manager.stopCleanupThread();
+    }
   }
   
+  /**
+   * Periodically checks if a file is there, return if the file is no longer
+   * there. Fails the test if a files is there for 30 seconds.
+   */
+  private void checkCacheDeletion(FileSystem fs, Path cache, String msg)
+  throws Exception {
+    // Check every 100ms to see if the cache is deleted
+    boolean cacheExists = true;
+    for (int i = 0; i < 300; i++) {
+      if (!fs.exists(cache)) {
+        cacheExists = false;
+        break;
+      }
+      TimeUnit.MILLISECONDS.sleep(100L);
+    }
+    // If the cache is still there after 5 minutes, test fails.
+    assertFalse(msg, cacheExists);
+  }
+
   public void testFileSystemOtherThanDefault() throws Exception {
     if (!canRun()) {
       return;