You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:21:35 UTC
svn commit: r1077504 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Author: omalley
Date: Fri Mar 4 04:21:35 2011
New Revision: 1077504
URL: http://svn.apache.org/viewvc?rev=1077504&view=rev
Log:
commit 32c081c757d8483ee4915e4da2e66abf51b1f038
Author: Arun C Murthy <ac...@apache.org>
Date: Tue Jun 15 11:25:26 2010 -0700
MAPREDUCE-1538. Add a limit on the number of artifacts in the DistributedCache to ensure we cleanup aggressively. Contributed by Dick King.
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1538. Add a limit on the number of artifacts in the
+ DistributedCache to ensure we cleanup aggressively. (Dick King via
+ acmurthy)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077504&r1=1077503&r2=1077504&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar 4 04:21:35 2011
@@ -61,10 +61,19 @@ public class TrackerDistributedCacheMana
private TreeMap<String, CacheStatus> cachedArchives =
new TreeMap<String, CacheStatus>();
- private TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+ // For holding the properties of each cache directory
+ static class CacheDir {
+ long size;
+ long subdirs;
+ }
+ private TreeMap<Path, CacheDir> baseDirProperties =
+ new TreeMap<Path, CacheDir>();
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
+ private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
+ private long allowedCacheSize;
+ private long allowedCacheSubdirs;
private static final Log LOG =
LogFactory.getLog(TrackerDistributedCacheManager.class);
@@ -85,6 +94,14 @@ public class TrackerDistributedCacheMana
this.trackerConf = conf;
this.lDirAllocator = new LocalDirAllocator("mapred.local.dir");
this.taskController = taskController;
+
+ // setting the cache size to a default of 10GB
+ this.allowedCacheSize = conf.getLong
+ ("local.cache.size", DEFAULT_CACHE_SIZE);
+ // setting the cache number of subdirectories limit to a default of 10000
+ this.allowedCacheSubdirs = conf.getLong
+ ("mapreduce.tasktracker.local.cache.numberdirectories",
+ DEFAULT_CACHE_SUBDIR_LIMIT);
}
/**
@@ -161,20 +178,21 @@ public class TrackerDistributedCacheMana
// try deleting stuff if you can
long size = 0;
+ long numberSubdirs = 0;
synchronized (lcacheStatus) {
- synchronized (baseDirSize) {
- Long get = baseDirSize.get(lcacheStatus.getBaseDir());
- if (get != null) {
- size = get.longValue();
+ synchronized (baseDirProperties) {
+ CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
+ if (cacheDir != null) {
+ size = cacheDir.size;
+ numberSubdirs = cacheDir.subdirs;
} else {
- LOG.warn("Cannot find size of baseDir: "
- + lcacheStatus.getBaseDir());
+ LOG.warn("Cannot find size and number of subdirectories of" +
+ " baseDir: " + lcacheStatus.getBaseDir());
}
}
}
- // setting the cache size to a default of 10GB
- long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
- if (allowedSize < size) {
+
+ if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
// try some cache deletions
deleteCache(conf);
}
@@ -254,18 +272,11 @@ public class TrackerDistributedCacheMana
for (CacheStatus lcacheStatus : deleteSet) {
synchronized (lcacheStatus) {
FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true);
+
+ // Update the maps baseDirSize and baseDirNumberSubDir
LOG.info("Deleted path " + lcacheStatus.localizedLoadPath);
- // decrement the size of the cache from baseDirSize
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
- if ( dirSize != null ) {
- dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
- } else {
- LOG.warn("Cannot find record of the baseDir: " +
- lcacheStatus.localizedBaseDir + " during delete!");
- }
- }
+
+ deleteCacheInfoUpdate(lcacheStatus);
}
}
}
@@ -425,15 +436,10 @@ public class TrackerDistributedCacheMana
long cacheSize =
FileUtil.getDU(new File(parchive.getParent().toString()));
cacheStatus.size = cacheSize;
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
- if( dirSize == null ) {
- dirSize = Long.valueOf(cacheSize);
- } else {
- dirSize += cacheSize;
- }
- baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
- }
+
+ // Increase the size and sub directory count of the cache
+ // from baseDirSize and baseDirNumberSubDir.
+ addCacheInfoUpdate(cacheStatus);
// set proper permissions for the localized directory
setPermissions(conf, cacheStatus, isPublic);
@@ -845,4 +851,50 @@ public class TrackerDistributedCacheMana
return path;
}
+
+ /**
+ * Decrement the size and sub directory count of the cache from baseDirSize
+ * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
+ * @param cacheStatus cache status of the cache is deleted
+ */
+ private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
+ if (!cacheStatus.inited) {
+ // if it is not created yet, do nothing.
+ return;
+ }
+ // decrement the size of the cache from baseDirSize
+ synchronized (baseDirProperties) {
+ CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+ if (cacheDir != null) {
+ cacheDir.size -= cacheStatus.size;
+ cacheDir.subdirs--;
+ } else {
+ LOG.warn("Cannot find size and number of subdirectories of" +
+ " baseDir: " + cacheStatus.getBaseDir());
+ }
+ }
+ }
+
+ /**
+ * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
+ * Increase the size and sub directory count of the cache from baseDirSize
+ * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
+ * @param cacheStatus cache status of the cache is added
+ */
+ private void addCacheInfoUpdate(CacheStatus cacheStatus) {
+ long cacheSize = cacheStatus.size;
+ // decrement the size of the cache from baseDirSize
+ synchronized (baseDirProperties) {
+ CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+ if (cacheDir != null) {
+ cacheDir.size += cacheSize;
+ cacheDir.subdirs++;
+ } else {
+ cacheDir = new CacheDir();
+ cacheDir.size = cacheSize;
+ cacheDir.subdirs = 1;
+ baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+ }
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077504&r1=1077503&r2=1077504&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar 4 04:21:35 2011
@@ -65,6 +65,7 @@ public class TestTrackerDistributedCache
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
+ private static final int LOCAL_CACHE_SUBDIR = 2;
protected Configuration conf;
protected Path firstCacheFile;
protected Path secondCacheFile;
@@ -479,6 +480,8 @@ public class TestTrackerDistributedCache
Configuration conf2 = new Configuration(conf);
conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+ conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories",
+ LOCAL_CACHE_SUBDIR);
refreshConf(conf2);
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf2, taskController);
@@ -487,6 +490,7 @@ public class TestTrackerDistributedCache
String userName = getJobOwnerName();
conf2.set("user.name", userName);
+ // We first test the size limit
Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
@@ -503,6 +507,33 @@ public class TestTrackerDistributedCache
assertFalse("DistributedCache failed deleting old" +
" cache when the cache store is full.",
localfs.exists(localCache));
+
+ // Now we test the number of sub directories limit
+ // Create the temporary cache files to be used in the tests.
+ Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+ Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+ // Adding two more small files, so it triggers the number of sub directory
+ // limit but does not trigger the file size limit.
+ createTempFile(thirdCacheFile, 1);
+ createTempFile(fourthCacheFile, 1);
+ Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(thirdCacheFile), false,
+ now, new Path(TEST_ROOT_DIR), false, false);
+ // Release the third cache so that it can be deleted while sweeping
+ manager.releaseCache(thirdCacheFile.toUri(), conf2, now);
+ // Getting the fourth cache will make the number of sub directories becomes
+ // 3 which is greater than 2. So the released cache will be deleted.
+ manager.getLocalCache(fourthCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(fourthCacheFile), false,
+ System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ assertFalse("DistributedCache failed deleting old" +
+ " cache when the cache exceeds the number of sub directories limit.",
+ localfs.exists(thirdLocalCache));
+ // Clean up the files created in this test
+ new File(thirdCacheFile.toString()).delete();
+ new File(fourthCacheFile.toString()).delete();
}
public void testFileSystemOtherThanDefault() throws Exception {
@@ -526,13 +557,17 @@ public class TestTrackerDistributedCache
}
static void createTempFile(Path p) throws IOException {
+ createTempFile(p, TEST_FILE_SIZE);
+ }
+
+ static void createTempFile(Path p, int size) throws IOException {
File f = new File(p.toString());
FileOutputStream os = new FileOutputStream(f);
- byte[] toWrite = new byte[TEST_FILE_SIZE];
+ byte[] toWrite = new byte[size];
new Random().nextBytes(toWrite);
os.write(toWrite);
os.close();
- FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
+ FileSystem.LOG.info("created: " + p + ", size=" + size);
}
static void createPublicTempFile(Path p)