You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by zs...@apache.org on 2010/04/30 00:42:35 UTC
svn commit: r939505 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/filecache/
src/java/org/apache/hadoop/mapreduce/server/tasktracker/
src/test/mapred/org/apache/hadoop/mapreduce/filecache/
Author: zshao
Date: Thu Apr 29 22:42:35 2010
New Revision: 939505
URL: http://svn.apache.org/viewvc?rev=939505&view=rev
Log:
MAPREDUCE-1568. TrackerDistributedCacheManager should clean up cache in a background thread. (Scott Chen via zshao)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Apr 29 22:42:35 2010
@@ -278,6 +278,9 @@ Trunk (unreleased changes)
MAPREDUCE-1417. Forrest documentation should be updated to reflect
the changes in MAPREDUCE-744. (Ravi Gummadi via vinodkv)
+ MAPREDUCE-1568. TrackerDistributedCacheManager should clean up cache
+ in a background thread. (Scott Chen via zshao)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Apr 29 22:42:35 2010
@@ -691,6 +691,7 @@ public class TaskTracker
this.distributedCacheManager =
new TrackerDistributedCacheManager(this.fConf, taskController,
asyncDiskService);
+ this.distributedCacheManager.startCleanupThread();
this.jobClient = (InterTrackerProtocol)
mrOwner.doAs(new PrivilegedExceptionAction<Object>() {
@@ -1218,6 +1219,7 @@ public class TaskTracker
this.mapLauncher.interrupt();
this.reduceLauncher.interrupt();
+ this.distributedCacheManager.stopCleanupThread();
jvmManager.stop();
// shutdown RPC connections
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Apr 29 22:42:35 2010
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.file
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -60,14 +62,6 @@ public class TrackerDistributedCacheMana
private TreeMap<String, CacheStatus> cachedArchives =
new TreeMap<String, CacheStatus>();
- // For holding the properties of each cache directory
- static class CacheDir {
- long size;
- long subdirs;
- }
- private TreeMap<Path, CacheDir> baseDirProperties =
- new TreeMap<Path, CacheDir>();
-
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
@@ -88,7 +82,10 @@ public class TrackerDistributedCacheMana
private Random random = new Random();
private MRAsyncDiskService asyncDiskService;
-
+
+ BaseDirManager baseDirManager = new BaseDirManager();
+ CleanupThread cleanupThread;
+
public TrackerDistributedCacheManager(Configuration conf,
TaskController taskController) throws IOException {
this.localFs = FileSystem.getLocal(conf);
@@ -101,6 +98,7 @@ public class TrackerDistributedCacheMana
// setting the cache number of subdirectories limit to a default of 10000
this.allowedCacheSubdirs = conf.getLong(
TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, DEFAULT_CACHE_SUBDIR_LIMIT);
+ this.cleanupThread = new CleanupThread(conf);
}
/**
@@ -186,26 +184,6 @@ public class TrackerDistributedCacheMana
createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
honorSymLinkConf);
}
-
- // try deleting stuff if you can
- long size = 0;
- long numberSubdirs = 0;
- synchronized (lcacheStatus) {
- synchronized (baseDirProperties) {
- CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
- if (cacheDir != null) {
- size = cacheDir.size;
- numberSubdirs = cacheDir.subdirs;
- } else {
- LOG.warn("Cannot find size and number of subdirectories of" +
- " baseDir: " + lcacheStatus.getBaseDir());
- }
- }
- }
- if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
- // try some cache deletions
- deleteCache(conf);
- }
initSuccessful = true;
return localizedPath;
} finally {
@@ -256,39 +234,6 @@ public class TrackerDistributedCacheMana
}
}
- // To delete the caches which have a refcount of zero
-
- private void deleteCache(Configuration conf) throws IOException {
- Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
- // try deleting cache Status with refcount of zero
- synchronized (cachedArchives) {
- for (Iterator<String> it = cachedArchives.keySet().iterator();
- it.hasNext();) {
- String cacheId = it.next();
- CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-
- // if reference count is zero
- // mark the cache for deletion
- if (lcacheStatus.refcount == 0) {
- // delete this cache entry from the global list
- // and mark the localized file for deletion
- deleteSet.add(lcacheStatus);
- it.remove();
- }
- }
- }
-
- // do the deletion, after releasing the global lock
- for (CacheStatus lcacheStatus : deleteSet) {
- synchronized (lcacheStatus) {
- deleteLocalPath(asyncDiskService,
- FileSystem.getLocal(conf), lcacheStatus.getLocalizedUniqueDir());
- // Update the maps baseDirSize and baseDirNumberSubDir
- deleteCacheInfoUpdate(lcacheStatus);
- }
- }
- }
-
/**
* Delete a local path with asyncDiskService if available,
* or otherwise synchronously with local file system.
@@ -505,7 +450,7 @@ public class TrackerDistributedCacheMana
cacheStatus.size = cacheSize;
// Increase the size and sub directory count of the cache
// from baseDirSize and baseDirNumberSubDir.
- addCacheInfoUpdate(cacheStatus);
+ baseDirManager.addCacheUpdate(cacheStatus);
// set proper permissions for the localized directory
setPermissions(conf, cacheStatus, isPublic);
@@ -606,30 +551,31 @@ public class TrackerDistributedCacheMana
}
static class CacheStatus {
- // the local load path of this cache
- Path localizedLoadPath;
-
- //the base dir where the cache lies
- Path localizedBaseDir;
-
- //the size of this cache
- long size;
-
- // number of instances using this cache
- int refcount;
-
- // the cache-file modification time
- long mtime;
-
- // is it initialized ?
- boolean inited = false;
-
+ //
+ // This field should be accessed under global cachedArchives lock.
+ //
+ int refcount; // number of instances using this cache.
+
+ //
+ // The following three fields should be accessed under
+ // individual cacheStatus lock.
+ //
+ long size; //the size of this cache.
+ long mtime; // the cache-file modification time
+ boolean inited = false; // is it initialized ?
+
+ //
+ // The following four fields are Immutable.
+ //
// The sub directory (tasktracker/archive or tasktracker/user/archive),
// under which the file will be localized
- Path subDir;
-
+ final Path subDir;
// unique string used in the construction of local load path
- String uniqueString;
+ final String uniqueString;
+ // the local load path of this cache
+ final Path localizedLoadPath;
+ //the base dir where the cache lies
+ final Path localizedBaseDir;
public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
String uniqueString) {
@@ -925,48 +871,154 @@ public class TrackerDistributedCacheMana
}
/**
- * Decrement the size and sub directory count of the cache from baseDirSize
- * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
- * @param cacheStatus cache status of the cache is deleted
- */
- private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
- if (!cacheStatus.inited) {
- // if it is not created yet, do nothing.
- return;
- }
- // decrement the size of the cache from baseDirSize
- synchronized (baseDirProperties) {
- CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
- if (cacheDir != null) {
- cacheDir.size -= cacheStatus.size;
- cacheDir.subdirs--;
- } else {
- LOG.warn("Cannot find size and number of subdirectories of" +
- " baseDir: " + cacheStatus.getBaseDir());
+ * A thread to check and cleanup the unused files periodically
+ */
+ private class CleanupThread extends Thread {
+ // How often do we check if we need to clean up cache files?
+ private long cleanUpCheckPeriod = 60000L; // 1 minute
+ public CleanupThread(Configuration conf) {
+ cleanUpCheckPeriod =
+ conf.getLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD,
+ cleanUpCheckPeriod);
+ }
+ private volatile boolean running = true;
+ public void stopRunning() {
+ running = false;
+ }
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ Thread.sleep(cleanUpCheckPeriod);
+ baseDirManager.checkAndCleanup();
+ } catch (Exception e) {
+ LOG.error("Exception in DistributedCache CleanupThread.", e);
+ // This thread should keep running and never crash.
+ }
}
}
}
-
+
/**
- * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
- * Increase the size and sub directory count of the cache from baseDirSize
- * and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
- * @param cacheStatus cache status of the cache is added
+ * This class holds properties of each base directories and is responsible
+ * for clean up unused cache files in base directories.
*/
- private void addCacheInfoUpdate(CacheStatus cacheStatus) {
- long cacheSize = cacheStatus.size;
- // decrement the size of the cache from baseDirSize
- synchronized (baseDirProperties) {
- CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
- if (cacheDir != null) {
- cacheDir.size += cacheSize;
- cacheDir.subdirs++;
- } else {
- cacheDir = new CacheDir();
- cacheDir.size = cacheSize;
- cacheDir.subdirs = 1;
- baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+ private class BaseDirManager {
+ private class CacheDir {
+ long size;
+ long subdirs;
+ }
+ private TreeMap<Path, CacheDir> properties =
+ new TreeMap<Path, CacheDir>();
+
+ private long getDirSize(Path p) {
+ return properties.get(p).size;
+ }
+ private long getDirSubdirs(Path p) {
+ return properties.get(p).subdirs;
+ }
+
+ /**
+ * Check each base directory to see if the size or number of subdirectories
+ * are exceed the limit. If the limit is exceeded, start deleting caches
+ * with zero reference count. This method synchronizes cachedArchives.
+ */
+ public void checkAndCleanup() throws IOException {
+ Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
+ Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+ synchronized (properties) {
+ for (Path baseDir : properties.keySet()) {
+ if (allowedCacheSize < getDirSize(baseDir) ||
+ allowedCacheSubdirs < getDirSubdirs(baseDir)) {
+ toBeCleanedBaseDir.add(baseDir);
+ }
+ }
+ }
+ synchronized (cachedArchives) {
+ for (Iterator<String> it = cachedArchives.keySet().iterator();
+ it.hasNext();) {
+ String cacheId = it.next();
+ CacheStatus cacheStatus = cachedArchives.get(cacheId);
+ if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+ // if reference count is zero mark the cache for deletion
+ if (cacheStatus.refcount == 0) {
+ // delete this cache entry from the global list
+ // and mark the localized file for deletion
+ toBeDeletedCache.add(cacheStatus);
+ it.remove();
+ }
+ }
+ }
+ }
+ // do the deletion, after releasing the global lock
+ for (CacheStatus cacheStatus : toBeDeletedCache) {
+ synchronized (cacheStatus) {
+ deleteLocalPath(asyncDiskService, FileSystem.getLocal(trackerConf),
+ cacheStatus.getLocalizedUniqueDir());
+ // Update the maps baseDirSize and baseDirNumberSubDir
+ deleteCacheUpdate(cacheStatus);
+ }
+ }
+ }
+ /**
+ * Decrement the size and sub directory count of the cache from baseDirSize
+ * and baseDirNumberSubDir. Have to synchronize cacheStatus before calling
+ * this method
+ * @param cacheStatus cache status of the cache is deleted
+ */
+ public void deleteCacheUpdate(CacheStatus cacheStatus) {
+ if (!cacheStatus.inited) {
+ // if it is not created yet, do nothing.
+ return;
+ }
+ synchronized (properties) {
+ CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
+ if (cacheDir != null) {
+ cacheDir.size -= cacheStatus.size;
+ cacheDir.subdirs--;
+ } else {
+ LOG.warn("Cannot find size and number of subdirectories of" +
+ " baseDir: " + cacheStatus.getBaseDir());
+ }
+ }
+ }
+
+ /**
+ * Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
+ * Increase the size and sub directory count of the cache from baseDirSize
+ * and baseDirNumberSubDir. Have to synchronize cacheStatus before calling
+ * this method.
+ * @param cacheStatus cache status of the cache is added
+ */
+ public void addCacheUpdate(CacheStatus cacheStatus) {
+ long cacheSize = cacheStatus.size;
+ synchronized (properties) {
+ CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
+ if (cacheDir != null) {
+ cacheDir.size += cacheSize;
+ cacheDir.subdirs++;
+ } else {
+ cacheDir = new CacheDir();
+ cacheDir.size = cacheSize;
+ cacheDir.subdirs = 1;
+ properties.put(cacheStatus.getBaseDir(), cacheDir);
+ }
}
}
}
+
+ /**
+ * Start the background thread
+ */
+ public void startCleanupThread() {
+ this.cleanupThread.start();
+ }
+
+ /**
+ * Stop the background thread
+ */
+ public void stopCleanupThread() {
+ cleanupThread.stopRunning();
+ cleanupThread.interrupt();
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Thu Apr 29 22:42:35 2010
@@ -94,4 +94,6 @@ public interface TTConfig extends MRConf
"mapreduce.tasktracker.group";
public static final String TT_USERLOGCLEANUP_SLEEPTIME =
"mapreduce.tasktracker.userlogcleanup.sleeptime";
+ public static final String TT_DISTRIBUTED_CACHE_CHECK_PERIOD =
+ "mapreduce.tasktracker.distributedcache.checkperiod";
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=939505&r1=939504&r2=939505&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Apr 29 22:42:35 2010
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginException;
@@ -441,9 +442,11 @@ public class TestTrackerDistributedCache
conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
+ conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
refreshConf(conf2);
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf2, taskController);
+ manager.startCleanupThread();
FileSystem localfs = FileSystem.getLocal(conf2);
long now = System.currentTimeMillis();
String userName = getJobOwnerName();
@@ -463,10 +466,8 @@ public class TestTrackerDistributedCache
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(secondCacheFile), false,
System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
- assertFalse("DistributedCache failed deleting old" +
- " cache when the cache store is full.",
- localfs.exists(localCache));
-
+ checkCacheDeletion(localfs, localCache, "DistributedCache failed " +
+ "deleting old cache when the cache store is full.");
// Now we test the number of sub directories limit
// Create the temporary cache files to be used in the tests.
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -487,12 +488,32 @@ public class TestTrackerDistributedCache
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(fourthCacheFile), false,
System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
- assertFalse("DistributedCache failed deleting old" +
- " cache when the cache exceeds the number of sub directories limit.",
- localfs.exists(thirdLocalCache));
+ checkCacheDeletion(localfs, thirdLocalCache,
+ "DistributedCache failed deleting old" +
+ " cache when the cache exceeds the number of sub directories limit.");
// Clean up the files created in this test
new File(thirdCacheFile.toString()).delete();
new File(fourthCacheFile.toString()).delete();
+ manager.stopCleanupThread();
+ }
+
+ /**
+ * Periodically checks if a file is there, return if the file is no longer
+ * there. Fails the test if a files is there for 30 seconds.
+ */
+ private void checkCacheDeletion(FileSystem fs, Path cache, String msg)
+ throws Exception {
+ // Check every 100ms to see if the cache is deleted
+ boolean cacheExists = true;
+ for (int i = 0; i < 300; i++) {
+ if (!fs.exists(cache)) {
+ cacheExists = false;
+ break;
+ }
+ TimeUnit.MILLISECONDS.sleep(100L);
+ }
+ // If the cache is still there after 5 minutes, test fails.
+ assertFalse(msg, cacheExists);
}
public void testFileSystemOtherThanDefault() throws Exception {