You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/06/17 00:28:50 UTC

svn commit: r1136712 - in /hadoop/common/branches/branch-0.20-security-204: CHANGES.txt src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Author: omalley
Date: Thu Jun 16 22:28:49 2011
New Revision: 1136712

URL: http://svn.apache.org/viewvc?rev=1136712&view=rev
Log:
MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
thread dies. (Robert Joseph Evans via cdouglas)

Modified:
    hadoop/common/branches/branch-0.20-security-204/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/CHANGES.txt?rev=1136712&r1=1136711&r2=1136712&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-204/CHANGES.txt Thu Jun 16 22:28:49 2011
@@ -11,6 +11,9 @@ Release 0.20.204.0 - unreleased
 
   BUG FIXES
 
+    MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
+    thread dies. (Robert Joseph Evans via cdouglas)
+
     MAPREDUCE-2555. Avoid sprious logging from completedtasks. (Thomas Graves
     via cdouglas)
 

Propchange: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 16 22:28:49 2011
@@ -1,5 +1,5 @@
 /hadoop/common/branches/branch-0.20/CHANGES.txt:826138,826568,829987,831184,833001,880632,898713,909245,909723,960946,1044225
-/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1131277,1131286,1131290,1131299,1131737,1134140
+/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1127362,1131277,1131286,1131290,1131299,1131737,1134140
 /hadoop/common/branches/branch-0.20-security-203/CHANGES.txt:1096071,1097012-1099333,1102071,1128115
 /hadoop/common/branches/branch-0.20-security-205/CHANGES.txt:1132788,1133133,1133274,1133282
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226

Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1136712&r1=1136711&r2=1136712&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Thu Jun 16 22:28:49 2011
@@ -100,6 +100,9 @@ public class TrackerDistributedCacheMana
   private Configuration trackerConf;
   
   private static final Random random = new Random();
+  
+  protected BaseDirManager baseDirManager = new BaseDirManager();
+  protected CleanupThread cleanupThread;
 
   public TrackerDistributedCacheManager(Configuration conf,
                                         TaskController controller
@@ -941,6 +944,130 @@ public class TrackerDistributedCacheMana
   }
   
   /**
+   * A thread to check and cleanup the unused files periodically
+   */
+  protected class CleanupThread extends Thread {
+    // How often do we check if we need to clean up cache files?
+    private long cleanUpCheckPeriod = 60000L; // 1 minute
+    public CleanupThread(Configuration conf) {
+      cleanUpCheckPeriod =
+        conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod",
+            cleanUpCheckPeriod);
+    }
+
+    private volatile boolean running = true;
+    
+    public void stopRunning() {
+      running = false;
+    }
+    
+    @Override
+    public void run() {
+      while (running) {
+        try {
+          Thread.sleep(cleanUpCheckPeriod);
+          baseDirManager.checkAndCleanup();
+        } catch (IOException e) {
+          LOG.error("Exception in DistributedCache CleanupThread.", e);
+        } catch(InterruptedException e) {
+          LOG.info("Cleanup...",e); 
+          //To force us to exit cleanly
+          running = false;
+        } catch (Throwable t) {
+          exitTaskTracker(t);
+        }
+      }
+    }
+    
+    /**
+     * Exit the task tracker because of a fatal error.
+     */
+    protected void exitTaskTracker(Throwable t) {
+      LOG.fatal("Distributed Cache cleanup thread received runtime exception." +
+      		" Exiting the TaskTracker", t);
+      Runtime.getRuntime().exit(-1);
+    }
+  }
+
+  /**
+   * This class holds properties of each base directories and is responsible
+   * for clean up unused cache files in base directories.
+   */
+  protected class BaseDirManager {
+
+    // For holding the properties of each cache directory
+    private class CacheDir {
+      long size;
+      long subdirs;
+    }
+
+    private TreeMap<Path, BaseDirManager.CacheDir> properties =
+    new TreeMap<Path, BaseDirManager.CacheDir>();
+
+    private long getDirSize(Path p) {
+      return properties.get(p).size;
+    }
+    private long getDirSubdirs(Path p) {
+      return properties.get(p).subdirs;
+    }
+    
+    void checkAndCleanup() throws IOException {
+      Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
+      Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+      synchronized (properties) {
+        for (Path baseDir : properties.keySet()) {
+          if (allowedCacheSize < getDirSize(baseDir) ||
+              allowedCacheSubdirs < getDirSubdirs(baseDir)) {
+            toBeCleanedBaseDir.add(baseDir);
+          }
+        }
+      }
+      // try deleting cache Status with refcount of zero
+      synchronized (cachedArchives) {
+        for (Iterator<String> it = cachedArchives.keySet().iterator(); 
+            it.hasNext();) {
+          String cacheId = it.next();
+          CacheStatus cacheStatus = cachedArchives.get(cacheId);
+          if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+            synchronized (cacheStatus) {
+              // if reference count is zero mark the cache for deletion
+              if (cacheStatus.refcount == 0) {
+                // delete this cache entry from the global list 
+                // and mark the localized file for deletion
+                toBeDeletedCache.add(cacheStatus);
+                it.remove();
+              }
+            }
+          }
+        }
+      }
+      
+      // do the deletion, after releasing the global lock
+      for (CacheStatus cacheStatus : toBeDeletedCache) {
+        synchronized (cacheStatus) {
+          Path localizedDir = cacheStatus.getLocalizedUniqueDir();
+          if (cacheStatus.user == null) {
+            TrackerDistributedCacheManager.LOG.info("Deleted path " + localizedDir);
+            try {
+              localFs.delete(localizedDir, true);
+            } catch (IOException e) {
+              TrackerDistributedCacheManager.LOG.warn("Could not delete distributed cache empty directory "
+                       + localizedDir, e);
+            }
+          } else {         
+            TrackerDistributedCacheManager.LOG.info("Deleted path " + localizedDir + " as " + cacheStatus.user);
+            String base = cacheStatus.getBaseDir().toString();
+            String userDir = TaskTracker.getUserDir(cacheStatus.user);
+            int skip = base.length() + 1 + userDir.length() + 1;
+            String relative = localizedDir.toString().substring(skip);
+            taskController.deleteAsUser(cacheStatus.user, relative);
+          }
+          deleteCacheInfoUpdate(cacheStatus);
+        }
+      }
+    }
+
+  /**
    * Decrement the size and sub directory count of the cache from baseDirSize
    * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
    * @param cacheStatus cache status of the cache is deleted

Modified: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1136712&r1=1136711&r2=1136712&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Thu Jun 16 22:28:49 2011
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.security.auth.login.LoginException;
 
@@ -546,6 +548,62 @@ public class TestTrackerDistributedCache
     }
   }
   
+  public static class MyTrackerDistributedCacheManager 
+      extends TrackerDistributedCacheManager {
+
+    public Throwable caught = null;
+    public CountDownLatch done = new CountDownLatch(1);
+
+    
+    public MyTrackerDistributedCacheManager(Configuration conf,
+        TaskController controller) throws IOException {
+      super(conf, controller);
+      this.baseDirManager = new TrackerDistributedCacheManager.BaseDirManager() {
+        
+        @Override
+        void checkAndCleanup() throws IOException {
+          throw new RuntimeException("This is a test!!!!");
+        }
+      };
+      
+      this.cleanupThread = new TestCleanupThread(conf);
+    }
+      
+    class TestCleanupThread extends TrackerDistributedCacheManager.CleanupThread {
+      
+      public TestCleanupThread(Configuration conf) {
+        super(conf);
+      }
+
+      @Override
+      protected void exitTaskTracker(Throwable t) {
+        caught = t;
+        this.stopRunning();
+        done.countDown();
+      }        
+    }
+  }
+  
+  public void testRuntimeExceptionInCleanup() throws Exception {
+    if(!canRun()) {
+      return;
+    }
+    
+    Configuration conf2 = new Configuration(conf);
+    conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
+    conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 0); // 0 ms (Don't sleep)
+    
+    refreshConf(conf2);
+    MyTrackerDistributedCacheManager manager = 
+        new MyTrackerDistributedCacheManager(conf2, taskController);
+    manager.startCleanupThread();
+    
+    assertTrue(manager.done.await(200l, TimeUnit.MILLISECONDS));
+    assertNotNull(manager.caught);
+    assertTrue(manager.caught instanceof RuntimeException);
+  }
+  
   protected String getJobOwnerName() throws IOException {
     return UserGroupInformation.getLoginUser().getUserName();
   }