You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2011/08/01 23:41:22 UTC
svn commit: r1152941 - in /hadoop/common/branches/branch-0.20-security:
CHANGES.txt
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Author: mahadev
Date: Mon Aug 1 21:41:20 2011
New Revision: 1152941
URL: http://svn.apache.org/viewvc?rev=1152941&view=rev
Log:
MAPREDUCE-2494. Make the distributed cache delete entires using LRU priority. (Robert Joseph Evans via mahadev)
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1152941&r1=1152940&r2=1152941&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Mon Aug 1 21:41:20 2011
@@ -35,6 +35,9 @@ Release 0.20.205.0 - unreleased
HADOOP-7314. Add support for throwing UnknownHostException when a host
doesn't resolve. Needed for MAPREDUCE-2489. (Jeffrey Naisbitt via mattf)
+ MAPREDUCE-2494. Make the distributed cache delete entires using LRU priority
+ (Robert Joseph Evans via mahadev)
+
Release 0.20.204.0 - unreleased
NEW FEATURES
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1152941&r1=1152940&r2=1152941&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Mon Aug 1 21:41:20 2011
@@ -28,6 +28,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
@@ -67,8 +68,8 @@ import org.apache.hadoop.mapreduce.secur
*/
public class TrackerDistributedCacheManager {
// cacheID to cacheStatus mapping
- private TreeMap<String, CacheStatus> cachedArchives =
- new TreeMap<String, CacheStatus>();
+ private LinkedHashMap<String, CacheStatus> cachedArchives =
+ new LinkedHashMap<String, CacheStatus>();
private Map<JobID, TaskDistributedCacheManager> jobArchives =
Collections.synchronizedMap(
new HashMap<JobID, TaskDistributedCacheManager>());
@@ -79,8 +80,11 @@ public class TrackerDistributedCacheMana
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
+ private static final float DEFAULT_CACHE_KEEP_AROUND_PCT = 0.95f;
private long allowedCacheSize;
private long allowedCacheSubdirs;
+ private long allowedCacheSizeCleanupGoal;
+ private long allowedCacheSubdirsCleanupGoal;
private static final Log LOG =
LogFactory.getLog(TrackerDistributedCacheManager.class);
@@ -110,6 +114,13 @@ public class TrackerDistributedCacheMana
this.allowedCacheSubdirs = conf.getLong
("mapreduce.tasktracker.local.cache.numberdirectories",
DEFAULT_CACHE_SUBDIR_LIMIT);
+ double cleanupPct = conf.getFloat("mapreduce.tasktracker.cache.local.keep.pct",
+ DEFAULT_CACHE_KEEP_AROUND_PCT);
+ this.allowedCacheSizeCleanupGoal =
+ (long)(this.allowedCacheSize * cleanupPct);
+ this.allowedCacheSubdirsCleanupGoal =
+ (long)(this.allowedCacheSubdirs * cleanupPct);
+
this.taskController = controller;
this.cleanupThread = new CleanupThread(conf);
}
@@ -162,15 +173,13 @@ public class TrackerDistributedCacheMana
lcacheStatus =
new CacheStatus(new Path(localPath.toString().replace(cachePath, "")),
localPath, new Path(subDir), uniqueString,
- isPublic ? null : user);
+ isPublic ? null : user, key);
cachedArchives.put(key, lcacheStatus);
}
//mark the cache for use.
file.setStatus(lcacheStatus);
- synchronized (lcacheStatus) {
- lcacheStatus.refcount++;
- }
+ lcacheStatus.incRefCount();
}
try {
@@ -203,11 +212,8 @@ public class TrackerDistributedCacheMana
}
}
} catch (IOException ie) {
- synchronized (lcacheStatus) {
- // release this cache
- lcacheStatus.refcount -= 1;
- throw ie;
- }
+ lcacheStatus.decRefCount();
+ throw ie;
}
return localizedPath;
}
@@ -223,9 +229,7 @@ public class TrackerDistributedCacheMana
* @throws IOException
*/
void releaseCache(CacheStatus status) throws IOException {
- synchronized (status) {
- status.refcount--;
- }
+ status.decRefCount();
}
void setSize(CacheStatus status, long size) throws IOException {
@@ -241,9 +245,7 @@ public class TrackerDistributedCacheMana
* This method is called from unit tests.
*/
int getReferenceCount(CacheStatus status) throws IOException {
- synchronized (status) {
- return status.refcount;
- }
+ return status.getRefCount();
}
/**
@@ -540,11 +542,11 @@ public class TrackerDistributedCacheMana
}
}
- static class CacheStatus {
+ class CacheStatus {
//
// This field should be accessed under global cachedArchives lock.
//
- int refcount; // number of instances using this cache
+ private int refcount; // number of instances using this cache
//
// The following two fields should be accessed under
@@ -568,9 +570,11 @@ public class TrackerDistributedCacheMana
Path localizedBaseDir;
// The user that owns the cache entry or null if it is public
final String user;
+ //The key of this in the cachedArchives.
+ private final String key;
public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
- String uniqueString, String user) {
+ String uniqueString, String user, String key) {
super();
this.localizedLoadPath = localLoadPath;
this.refcount = 0;
@@ -579,8 +583,34 @@ public class TrackerDistributedCacheMana
this.subDir = subDir;
this.uniqueString = uniqueString;
this.user = user;
+ this.key = key;
}
+ public synchronized void incRefCount() {
+ refcount += 1;
+ }
+
+ public void decRefCount() {
+ synchronized (cachedArchives) {
+ synchronized (this) {
+ refcount -= 1;
+ if(refcount <= 0) {
+ String key = this.key;
+ cachedArchives.remove(key);
+ cachedArchives.put(key, this);
+ }
+ }
+ }
+ }
+
+ public int getRefCount() {
+ return refcount;
+ }
+
+ public synchronized boolean isUsed() {
+ return refcount > 0;
+ }
+
Path getBaseDir(){
return this.localizedBaseDir;
}
@@ -917,49 +947,52 @@ public class TrackerDistributedCacheMana
}
}
+ // For holding the properties of each cache directory
+ private static class CacheDir {
+ long size;
+ long subdirs;
+ }
+
/**
* This class holds properties of each base directories and is responsible
* for clean up unused cache files in base directories.
*/
protected class BaseDirManager {
-
- // For holding the properties of each cache directory
- private class CacheDir {
- long size;
- long subdirs;
- }
-
- private TreeMap<Path, BaseDirManager.CacheDir> properties =
- new TreeMap<Path, BaseDirManager.CacheDir>();
-
- private long getDirSize(Path p) {
- return properties.get(p).size;
- }
- private long getDirSubdirs(Path p) {
- return properties.get(p).subdirs;
- }
+ private TreeMap<Path, CacheDir> properties =
+ new TreeMap<Path, CacheDir>();
void checkAndCleanup() throws IOException {
Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
- Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+ HashMap<Path, CacheDir> toBeCleanedBaseDir =
+ new HashMap<Path, CacheDir>();
synchronized (properties) {
- for (Path baseDir : properties.keySet()) {
- if (allowedCacheSize < getDirSize(baseDir) ||
- allowedCacheSubdirs < getDirSubdirs(baseDir)) {
- toBeCleanedBaseDir.add(baseDir);
+ for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
+ CacheDir baseDirCounts = baseDir.getValue();
+ if (allowedCacheSize < baseDirCounts.size ||
+ allowedCacheSubdirs < baseDirCounts.subdirs) {
+ CacheDir tcc = new CacheDir();
+ tcc.size = baseDirCounts.size - allowedCacheSizeCleanupGoal;
+ tcc.subdirs = baseDirCounts.subdirs - allowedCacheSubdirsCleanupGoal;
+ toBeCleanedBaseDir.put(baseDir.getKey(), tcc);
}
}
}
// try deleting cache Status with refcount of zero
synchronized (cachedArchives) {
- for (Iterator<String> it = cachedArchives.keySet().iterator();
- it.hasNext();) {
- String cacheId = it.next();
+ for(
+ Iterator<Map.Entry<String, CacheStatus>> it
+ = cachedArchives.entrySet().iterator();
+ it.hasNext(); ) {
+ Map.Entry<String, CacheStatus> entry = it.next();
+ String cacheId = entry.getKey();
CacheStatus cacheStatus = cachedArchives.get(cacheId);
- if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+ CacheDir leftToClean = toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
+ if (leftToClean != null && (leftToClean.size > 0 || leftToClean.subdirs > 0)) {
synchronized (cacheStatus) {
// if reference count is zero mark the cache for deletion
- if (cacheStatus.refcount == 0) {
+ if (!cacheStatus.isUsed()) {
+ leftToClean.size -= cacheStatus.size;
+ leftToClean.subdirs--;
// delete this cache entry from the global list
// and mark the localized file for deletion
toBeDeletedCache.add(cacheStatus);
@@ -1007,7 +1040,7 @@ public class TrackerDistributedCacheMana
}
// decrement the size of the cache from baseDirSize
synchronized (baseDirManager.properties) {
- BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
+ CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
if (cacheDir != null) {
cacheDir.size -= cacheStatus.size;
cacheDir.subdirs--;
@@ -1028,12 +1061,12 @@ public class TrackerDistributedCacheMana
long cacheSize = cacheStatus.size;
// decrement the size of the cache from baseDirSize
synchronized (baseDirManager.properties) {
- BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
+ CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
if (cacheDir != null) {
cacheDir.size += cacheSize;
cacheDir.subdirs++;
} else {
- cacheDir = new BaseDirManager.CacheDir();
+ cacheDir = new CacheDir();
cacheDir.size = cacheSize;
cacheDir.subdirs = 1;
properties.put(cacheStatus.getBaseDir(), cacheDir);
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1152941&r1=1152940&r2=1152941&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Mon Aug 1 21:41:20 2011
@@ -44,7 +44,6 @@ import org.apache.hadoop.mapred.TaskCont
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -608,6 +607,130 @@ public class TestTrackerDistributedCache
return UserGroupInformation.getLoginUser().getUserName();
}
+ public static final long CACHE_DELETE_PERIOD_MS = 100l;
+
+ /** test delete cache */
+ public void testLRUDeleteCache() throws Exception {
+ if (!canRun()) {
+ return;
+ }
+ // This test needs MRConfig.LOCAL_DIR to be single directory
+ // instead of four, because it assumes that both
+ // firstcachefile and secondcachefile will be localized on same directory
+ // so that second localization triggers deleteCache.
+ // If MRConfig.LOCAL_DIR is four directories, second localization might not
+ // trigger deleteCache, if it is localized in different directory.
+ Configuration conf2 = new Configuration(conf);
+ conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
+ //Make it larger then expected
+ conf2.setLong("local.cache.size", 21 * 1024l);
+ conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories", 3);
+ //The goal is to get down to 15.75K and 2 dirs
+ conf2.setFloat("mapreduce.tasktracker.cache.local.keep.pct", 0.75f);
+ conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", CACHE_DELETE_PERIOD_MS);
+ refreshConf(conf2);
+ TrackerDistributedCacheManager manager =
+ new TrackerDistributedCacheManager(conf2, taskController);
+ try {
+ manager.startCleanupThread();
+ FileSystem localfs = FileSystem.getLocal(conf2);
+ String userName = getJobOwnerName();
+ conf2.set("user.name", userName);
+
+ //Here we are testing the LRU. In this case we will add in 4 cache entries
+ // 2 of them are 8k each and 2 of them are very small. We want to verify
+ // That they are deleted in LRU order.
+ // So what we will do is add in the two large files first, 1 then 2, and
+ // then one of the small ones 3. We will then release them in opposite
+ // order 3, 2, 1.
+ //
+ // Finally we will add in the last small file. This last file should push
+ // us over the 3 entry limit to trigger a cleanup. So LRU order is 3, 2, 1
+ // And we will only delete 2 entries so that should leave 1 un touched
+ // but 3 and 2 deleted
+
+ Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+ Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+ // Adding two more small files, so it triggers the number of sub directory
+ // limit but does not trigger the file size limit.
+ createTempFile(thirdCacheFile, 1);
+ createTempFile(fourthCacheFile, 1);
+
+ FileStatus stat = fs.getFileStatus(firstCacheFilePublic);
+ CacheFile cfile1 = new CacheFile(firstCacheFilePublic.toUri(),
+ CacheFile.FileType.REGULAR, true,
+ stat.getModificationTime(),
+ true);
+
+ Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(firstCacheFilePublic), false,
+ fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
+ cfile1);
+
+ stat = fs.getFileStatus(secondCacheFilePublic);
+ CacheFile cfile2 = new CacheFile(secondCacheFilePublic.toUri(),
+ CacheFile.FileType.REGULAR, true,
+ stat.getModificationTime(),
+ true);
+
+ Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(secondCacheFilePublic), false,
+ fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
+ cfile2);
+
+ stat = fs.getFileStatus(thirdCacheFile);
+ CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(),
+ CacheFile.FileType.REGULAR, true,
+ stat.getModificationTime(),
+ true);
+
+ Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(thirdCacheFile), false,
+ fs.getFileStatus(thirdCacheFile).getModificationTime(), true,
+ cfile3);
+
+ manager.releaseCache(cfile3.getStatus());
+ manager.releaseCache(cfile2.getStatus());
+ manager.releaseCache(cfile1.getStatus());
+
+ // Getting the fourth cache will make the number of sub directories becomes
+ // 4 which is greater than 3. So the released cache will be deleted.
+ stat = fs.getFileStatus(fourthCacheFile);
+ CacheFile cfile4 = new CacheFile(fourthCacheFile.toUri(),
+ CacheFile.FileType.REGULAR, true,
+ stat.getModificationTime(),
+ true);
+
+ Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(fourthCacheFile), false,
+ fs.getFileStatus(fourthCacheFile).getModificationTime(), true,
+ cfile4);
+
+ checkCacheDeletion(localfs, secondLocalCache, "DistributedCache failed " +
+ "deleting second cache LRU order");
+
+ checkCacheDeletion(localfs, thirdLocalCache,
+ "DistributedCache failed deleting third" +
+ " cache LRU order.");
+
+ checkCacheNOTDeletion(localfs, firstLocalCache, "DistributedCache failed " +
+ "Deleted first cache LRU order.");
+
+ checkCacheNOTDeletion(localfs, fourthCacheFile, "DistributedCache failed " +
+ "Deleted fourth cache LRU order.");
+ // Clean up the files created in this test
+ new File(thirdCacheFile.toString()).delete();
+ new File(fourthCacheFile.toString()).delete();
+ } finally {
+ manager.stopCleanupThread();
+ }
+ }
+
+
/** test delete cache */
public void testDeleteCache() throws Exception {
if (!canRun()) {
@@ -622,7 +745,7 @@ public class TestTrackerDistributedCache
Configuration conf2 = new Configuration(conf);
conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
- conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 200); // 200 ms
+ conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", CACHE_DELETE_PERIOD_MS);
refreshConf(conf2);
TrackerDistributedCacheManager manager =
@@ -762,6 +885,15 @@ public class TestTrackerDistributedCache
}
/**
+ * Do a simple check to see if the file has NOT been deleted.
+ */
+ private void checkCacheNOTDeletion(FileSystem fs, Path cache, String msg)
+ throws Exception {
+ TimeUnit.MILLISECONDS.sleep(3 * CACHE_DELETE_PERIOD_MS);
+ assertTrue(msg, fs.exists(cache));
+ }
+
+ /**
* Periodically checks if a file is there, return if the file is no longer
* there. Fails the test if a files is there for 30 seconds.
*/
@@ -774,7 +906,7 @@ public class TestTrackerDistributedCache
cacheExists = false;
break;
}
- TimeUnit.MILLISECONDS.sleep(100L);
+ TimeUnit.MILLISECONDS.sleep(CACHE_DELETE_PERIOD_MS);
}
// If the cache is still there after 5 minutes, test fails.
assertFalse(msg, cacheExists);