You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2010/04/17 00:11:37 UTC
svn commit: r935090 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapreduce/filecache/
src/java/org/apache/hadoop/mapreduce/server/tasktracker/
src/test/mapred/org/apache/hadoop/mapreduce/filecache/
Author: dhruba
Date: Fri Apr 16 22:11:36 2010
New Revision: 935090
URL: http://svn.apache.org/viewvc?rev=935090&view=rev
Log:
MAPREDUCE-1538. TrackerDistributedCacheManager manages the
number of files. (Scott Chen via dhruba)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=935090&r1=935089&r2=935090&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Apr 16 22:11:36 2010
@@ -248,6 +248,9 @@ Trunk (unreleased changes)
MAPREDUCE-1466. Record number of files processed in FileInputFormat in the
Configuration for offline analysis. (Luke Lu and Arun Murthy via cdouglas)
+ MAPREDUCE-1538. TrackerDistributedCacheManager manages the
+ number of files. (Scott Chen via dhruba)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=935090&r1=935089&r2=935090&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Fri Apr 16 22:11:36 2010
@@ -60,10 +60,19 @@ public class TrackerDistributedCacheMana
private TreeMap<String, CacheStatus> cachedArchives =
new TreeMap<String, CacheStatus>();
- private TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+ // For holding the properties of each cache directory
+ static class CacheDir {
+ long size;
+ long subdirs;
+ }
+ private TreeMap<Path, CacheDir> baseDirProperties =
+ new TreeMap<Path, CacheDir>();
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
+ private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
+ private long allowedCacheSize;
+ private long allowedCacheSubdirs;
private static final Log LOG =
LogFactory.getLog(TrackerDistributedCacheManager.class);
@@ -86,6 +95,12 @@ public class TrackerDistributedCacheMana
this.trackerConf = conf;
this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
this.taskController = taskController;
+ // setting the cache size to a default of 10GB
+ this.allowedCacheSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE,
+ DEFAULT_CACHE_SIZE);
+ // setting the cache number of subdirectories limit to a default of 10000
+ this.allowedCacheSubdirs = conf.getLong(
+ TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, DEFAULT_CACHE_SUBDIR_LIMIT);
}
/**
@@ -174,21 +189,20 @@ public class TrackerDistributedCacheMana
// try deleting stuff if you can
long size = 0;
+ long numberSubdirs = 0;
synchronized (lcacheStatus) {
- synchronized (baseDirSize) {
- Long get = baseDirSize.get(lcacheStatus.getBaseDir());
- if (get != null) {
- size = get.longValue();
+ synchronized (baseDirProperties) {
+ CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
+ if (cacheDir != null) {
+ size = cacheDir.size;
+ numberSubdirs = cacheDir.subdirs;
} else {
- LOG.warn("Cannot find size of baseDir: "
- + lcacheStatus.getBaseDir());
+ LOG.warn("Cannot find size and number of subdirectories of" +
+ " baseDir: " + lcacheStatus.getBaseDir());
}
}
}
- // setting the cache size to a default of 10GB
- long allowedSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE,
- DEFAULT_CACHE_SIZE);
- if (allowedSize < size) {
+ if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
// try some cache deletions
deleteCache(conf);
}
@@ -268,18 +282,9 @@ public class TrackerDistributedCacheMana
for (CacheStatus lcacheStatus : deleteSet) {
synchronized (lcacheStatus) {
deleteLocalPath(asyncDiskService,
- FileSystem.getLocal(conf), lcacheStatus.localizedLoadPath);
- // decrement the size of the cache from baseDirSize
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
- if ( dirSize != null ) {
- dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
- } else {
- LOG.warn("Cannot find record of the baseDir: " +
- lcacheStatus.localizedBaseDir + " during delete!");
- }
- }
+ FileSystem.getLocal(conf), lcacheStatus.getLocalizedUniqueDir());
+ // Update the maps baseDirSize and baseDirNumberSubDir
+ deleteCacheInfoUpdate(lcacheStatus);
}
}
}
@@ -498,15 +503,9 @@ public class TrackerDistributedCacheMana
long cacheSize =
FileUtil.getDU(new File(parchive.getParent().toString()));
cacheStatus.size = cacheSize;
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
- if( dirSize == null ) {
- dirSize = Long.valueOf(cacheSize);
- } else {
- dirSize += cacheSize;
- }
- baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
- }
+ // Increase the size and sub directory count of the cache
+ // from baseDirSize and baseDirNumberSubDir.
+ addCacheInfoUpdate(cacheStatus);
// set proper permissions for the localized directory
setPermissions(conf, cacheStatus, isPublic);
@@ -924,4 +923,50 @@ public class TrackerDistributedCacheMana
static void setLocalFiles(Configuration conf, String str) {
conf.set(JobContext.CACHE_LOCALFILES, str);
}
+
+ /**
+ * Decrement the size and sub directory count of the cache from baseDirSize
+ * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
+ * @param cacheStatus cache status of the cache is deleted
+ */
+ private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
+ if (!cacheStatus.inited) {
+ // if it is not created yet, do nothing.
+ return;
+ }
+ // decrement the size of the cache from baseDirSize
+ synchronized (baseDirProperties) {
+ CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+ if (cacheDir != null) {
+ cacheDir.size -= cacheStatus.size;
+ cacheDir.subdirs--;
+ } else {
+ LOG.warn("Cannot find size and number of subdirectories of" +
+ " baseDir: " + cacheStatus.getBaseDir());
+ }
+ }
+ }
+
+ /**
+ * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
+ * Increase the size and sub directory count of the cache from baseDirSize
+ * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
+ * @param cacheStatus cache status of the cache is added
+ */
+ private void addCacheInfoUpdate(CacheStatus cacheStatus) {
+ long cacheSize = cacheStatus.size;
+ // decrement the size of the cache from baseDirSize
+ synchronized (baseDirProperties) {
+ CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+ if (cacheDir != null) {
+ cacheDir.size += cacheSize;
+ cacheDir.subdirs++;
+ } else {
+ cacheDir = new CacheDir();
+ cacheDir.size = cacheSize;
+ cacheDir.subdirs = 1;
+ baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+ }
+ }
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=935090&r1=935089&r2=935090&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Fri Apr 16 22:11:36 2010
@@ -81,6 +81,8 @@ public interface TTConfig extends MRConf
"mapreduce.tasktracker.taskmemorymanager.monitoringinterval";
public static final String TT_LOCAL_CACHE_SIZE =
"mapreduce.tasktracker.cache.local.size";
+ public static final String TT_LOCAL_CACHE_SUBDIRS_LIMIT =
+ "mapreduce.tasktracker.cache.local.numberdirectories";
public static final String TT_OUTOFBAND_HEARBEAT =
"mapreduce.tasktracker.outofband.heartbeat";
public static final String TT_USER_NAME = "mapreduce.tasktracker.user.name";
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=935090&r1=935089&r2=935090&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Fri Apr 16 22:11:36 2010
@@ -66,6 +66,7 @@ public class TestTrackerDistributedCache
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
+ private static final int LOCAL_CACHE_SUBDIR = 2;
protected Configuration conf;
protected Path firstCacheFile;
protected Path secondCacheFile;
@@ -439,6 +440,7 @@ public class TestTrackerDistributedCache
Configuration conf2 = new Configuration(conf);
conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
+ conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
refreshConf(conf2);
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf2, taskController);
@@ -447,6 +449,7 @@ public class TestTrackerDistributedCache
String userName = getJobOwnerName();
conf2.set(JobContext.USER_NAME, userName);
+ // We first test the size limit
Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
@@ -463,6 +466,33 @@ public class TestTrackerDistributedCache
assertFalse("DistributedCache failed deleting old" +
" cache when the cache store is full.",
localfs.exists(localCache));
+
+ // Now we test the number of sub directories limit
+ // Create the temporary cache files to be used in the tests.
+ Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+ Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+ // Adding two more small files, so it triggers the number of sub directory
+ // limit but does not trigger the file size limit.
+ createTempFile(thirdCacheFile, 1);
+ createTempFile(fourthCacheFile, 1);
+ Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(thirdCacheFile), false,
+ now, new Path(TEST_ROOT_DIR), false, false);
+ // Release the third cache so that it can be deleted while sweeping
+ manager.releaseCache(thirdCacheFile.toUri(), conf2, now);
+ // Getting the fourth cache will make the number of sub directories becomes
+ // 3 which is greater than 2. So the released cache will be deleted.
+ manager.getLocalCache(fourthCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(fourthCacheFile), false,
+ System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ assertFalse("DistributedCache failed deleting old" +
+ " cache when the cache exceeds the number of sub directories limit.",
+ localfs.exists(thirdLocalCache));
+ // Clean up the files created in this test
+ new File(thirdCacheFile.toString()).delete();
+ new File(fourthCacheFile.toString()).delete();
}
public void testFileSystemOtherThanDefault() throws Exception {
@@ -486,13 +516,17 @@ public class TestTrackerDistributedCache
}
static void createTempFile(Path p) throws IOException {
+ createTempFile(p, TEST_FILE_SIZE);
+ }
+
+ static void createTempFile(Path p, int size) throws IOException {
File f = new File(p.toString());
FileOutputStream os = new FileOutputStream(f);
- byte[] toWrite = new byte[TEST_FILE_SIZE];
+ byte[] toWrite = new byte[size];
new Random().nextBytes(toWrite);
os.write(toWrite);
os.close();
- FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
+ FileSystem.LOG.info("created: " + p + ", size=" + size);
}
static void createPublicTempFile(Path p)