You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/03/26 23:31:29 UTC

svn commit: r758898 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/core/org/apache/hadoop/filecache/DistributedCache.java src/test/org/apache/hadoop/filecache/ src/test/org/apache/hadoop/filecache/TestDistributedCache.java

Author: cdouglas
Date: Thu Mar 26 22:31:29 2009
New Revision: 758898

URL: http://svn.apache.org/viewvc?rev=758898&view=rev
Log:
HADOOP-4780. Cache the size of directories in DistributedCache, avoiding
long delays in recalculating it. Contributed by He Yongqiang

Added:
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/filecache/
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/filecache/DistributedCache.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=758898&r1=758897&r2=758898&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Thu Mar 26 22:31:29 2009
@@ -95,6 +95,9 @@
     HADOOP-5374. Fixes a NPE problem in getTasksToSave method.
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-4780. Cache the size of directories in DistributedCache, avoiding
+    long delays in recalculating it. (He Yongqiang via cdouglas)
+
 Release 0.19.1 - 2009-02-23 
 
     HADOOP-5225. Workaround for tmp file handling in HDFS. sync() is 

Modified: hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=758898&r1=758897&r2=758898&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/filecache/DistributedCache.java Thu Mar 26 22:31:29 2009
@@ -116,6 +116,8 @@
   // cacheID to cacheStatus mapping
   private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
   
+  private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+  
   // default total cache size
   private static final long DEFAULT_CACHE_SIZE = 1048576L;
 
@@ -195,7 +197,7 @@
       lcacheStatus = cachedArchives.get(cacheId);
       if (lcacheStatus == null) {
         // was never localized
-        lcacheStatus = new CacheStatus(new Path(baseDir, new Path(cacheId)));
+        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
         cachedArchives.put(cacheId, lcacheStatus);
       }
 
@@ -207,7 +209,13 @@
     }
 
     // try deleting stuff if you can
-    long size = FileUtil.getDU(new File(baseDir.toString()));
+    long size = 0;
+    synchronized (baseDirSize) {
+      Long get = baseDirSize.get(baseDir);
+      if ( get != null ) {
+    	size = get.longValue();
+      }
+    }
     // setting the cache size to a default of 1MB
     long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
     if (allowedSize < size) {
@@ -285,6 +293,13 @@
           if (lcacheStatus.refcount == 0) {
             // delete this cache entry
             FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+            synchronized (baseDirSize) {
+              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+              if ( dirSize != null ) {
+            	dirSize -= lcacheStatus.size;
+            	baseDirSize.put(lcacheStatus.baseDir, dirSize);
+              }
+            }
             it.remove();
           }
         }
@@ -367,6 +382,13 @@
       
       FileSystem localFs = FileSystem.getLocal(conf);
       localFs.delete(cacheStatus.localLoadPath, true);
+      synchronized (baseDirSize) {
+    	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+    	if ( dirSize != null ) {
+    	  dirSize -= cacheStatus.size;
+    	  baseDirSize.put(cacheStatus.baseDir, dirSize);
+    	}
+      }
       Path parchive = new Path(cacheStatus.localLoadPath,
                                new Path(cacheStatus.localLoadPath.getName()));
       
@@ -392,6 +414,18 @@
         // and copy the file into the dir as it is
       }
       
+      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
+      cacheStatus.size = cacheSize;
+      synchronized (baseDirSize) {
+      	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      	if( dirSize == null ) {
+      	  dirSize = Long.valueOf(cacheSize);
+      	} else {
+      	  dirSize += cacheSize;
+      	}
+      	baseDirSize.put(cacheStatus.baseDir, dirSize);
+      }
+      
       // do chmod here 
       try {
     	FileUtil.chmod(parchive.toString(), "+x");
@@ -811,6 +845,12 @@
 
     // the local load path of this cache
     Path localLoadPath;
+    
+    //the base dir where the cache lies
+    Path baseDir;
+    
+    //the size of this cache
+    long size;
 
     // number of instances using this cache
     int refcount;
@@ -818,12 +858,14 @@
     // the cache-file modification time
     long mtime;
 
-    public CacheStatus(Path localLoadPath) {
+    public CacheStatus(Path baseDir, Path localLoadPath) {
       super();
       this.currentStatus = false;
       this.localLoadPath = localLoadPath;
       this.refcount = 0;
       this.mtime = -1;
+      this.baseDir = baseDir;
+      this.size = 0;
     }
   }
 

Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/filecache/TestDistributedCache.java?rev=758898&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/filecache/TestDistributedCache.java (added)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/filecache/TestDistributedCache.java Thu Mar 26 22:31:29 2009
@@ -0,0 +1,77 @@
+package org.apache.hadoop.filecache;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestDistributedCache extends TestCase {
+  
+  static final URI LOCAL_FS = URI.create("file:///");
+  private static String TEST_CACHE_BASE_DIR =
+    new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
+    .toString().replace(' ', '+');
+  private static String TEST_ROOT_DIR =
+    System.getProperty("test.build.data", "/tmp/distributedcache");
+  private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
+  private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
+  private Configuration conf;
+  private Path firstCacheFile;
+  private Path secondCacheFile;
+  private FileSystem localfs;
+  
+  /**
+   * @see TestCase#setUp()
+   */
+  @Override
+  protected void setUp() throws IOException {
+    conf = new Configuration();
+    conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    localfs = FileSystem.get(LOCAL_FS, conf);
+    firstCacheFile = new Path(TEST_ROOT_DIR+"/firstcachefile");
+    secondCacheFile = new Path(TEST_ROOT_DIR+"/secondcachefile");
+    createTempFile(localfs, firstCacheFile);
+    createTempFile(localfs, secondCacheFile);
+  }
+  
+  /** test delete cache */
+  public void testDeleteCache() throws Exception {
+    DistributedCache.getLocalCache(firstCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    DistributedCache.releaseCache(firstCacheFile.toUri(), conf);
+    //in above code,localized a file of size 4K and then release the cache which will cause the cache 
+    //be deleted when the limit goes out. The below code localize another cache which's designed to 
+    //sweep away the first cache.
+    DistributedCache.getLocalCache(secondCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    FileStatus[] dirStatuses = localfs.listStatus(new Path(TEST_CACHE_BASE_DIR));
+    assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
+        dirStatuses.length > 1);
+  }
+
+  private void createTempFile(FileSystem fs, Path p) throws IOException {
+    FSDataOutputStream out = fs.create(p);
+    byte[] toWrite = new byte[TEST_FILE_SIZE];
+    new Random().nextBytes(toWrite);
+    out.write(toWrite);
+    out.close();
+    FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
+  }
+  
+  /**
+   * @see TestCase#tearDown()
+   */
+  @Override
+  protected void tearDown() throws IOException {
+    localfs.delete(firstCacheFile, true);
+    localfs.delete(secondCacheFile, true);
+    localfs.close();
+  }
+}