You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/06/17 00:57:43 UTC
svn commit: r1136726 - in /hadoop/common/branches/branch-0.20-security-204:
./ src/mapred/org/apache/hadoop/filecache/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/filecache/
Author: omalley
Date: Thu Jun 16 22:57:43 2011
New Revision: 1136726
URL: http://svn.apache.org/viewvc?rev=1136726&view=rev
Log:
MAPREDUCE-2479. Move distributed cache cleanup to a background task,
backporting MAPREDUCE-1568. (Robert Joseph Evans via cdouglas)
Modified:
hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/CHANGES.txt?rev=1136726&r1=1136725&r2=1136726&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-204/CHANGES.txt Thu Jun 16 22:57:43 2011
@@ -34,6 +34,9 @@ Release 0.20.204.0 - unreleased
HADOOP-7369. Fix permissions in tarball for sbin/* and libexec/* (omalley)
+ MAPREDUCE-2479. Move distributed cache cleanup to a background task,
+ backporting MAPREDUCE-1568. (Robert Joseph Evans via cdouglas)
+
HADOOP-7356. Fix bin/hadoop scripts (eyang via omalley)
HADOOP-7272. Remove unnecessary security related info logs. (suresh)
Propchange: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 16 22:57:43 2011
@@ -1,5 +1,5 @@
/hadoop/common/branches/branch-0.20/CHANGES.txt:826138,826568,829987,831184,833001,880632,898713,909245,909723,960946,1044225
-/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1125139,1125170,1125587,1125589,1127362,1131277,1131286,1131290,1131299,1131737,1134140
+/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1103940,1125139,1125170,1125587,1125589,1127362,1131277,1131286,1131290,1131299,1131737,1134140
/hadoop/common/branches/branch-0.20-security-203/CHANGES.txt:1096071,1097012-1099333,1102071,1128115
/hadoop/common/branches/branch-0.20-security-205/CHANGES.txt:1132788,1133133,1133274,1133282
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1136726&r1=1136725&r2=1136726&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Thu Jun 16 22:57:43 2011
@@ -22,12 +22,12 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DateFormat;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
@@ -76,14 +76,6 @@ public class TrackerDistributedCacheMana
private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
FsPermission.createImmutable((short) 0755);
- // For holding the properties of each cache directory
- static class CacheDir {
- long size;
- long subdirs;
- }
- private TreeMap<Path, CacheDir> baseDirProperties =
- new TreeMap<Path, CacheDir>();
-
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
@@ -101,8 +93,8 @@ public class TrackerDistributedCacheMana
private static final Random random = new Random();
- protected BaseDirManager baseDirManager = new BaseDirManager();
- protected CleanupThread cleanupThread;
+ BaseDirManager baseDirManager = new BaseDirManager();
+ CleanupThread cleanupThread;
public TrackerDistributedCacheManager(Configuration conf,
TaskController controller
@@ -119,6 +111,7 @@ public class TrackerDistributedCacheMana
("mapreduce.tasktracker.local.cache.numberdirectories",
DEFAULT_CACHE_SUBDIR_LIMIT);
this.taskController = controller;
+ this.cleanupThread = new CleanupThread(conf);
}
/**
@@ -200,7 +193,7 @@ public class TrackerDistributedCacheMana
// Increase the size and sub directory count of the cache
// from baseDirSize and baseDirNumberSubDir.
- addCacheInfoUpdate(lcacheStatus);
+ baseDirManager.addCacheInfoUpdate(lcacheStatus);
}
}
lcacheStatus.initComplete();
@@ -209,27 +202,6 @@ public class TrackerDistributedCacheMana
lcacheStatus, fileStatus, isArchive);
}
}
-
- // try deleting stuff if you can
- long size = 0;
- long numberSubdirs = 0;
- synchronized (lcacheStatus) {
- synchronized (baseDirProperties) {
- CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
- if (cacheDir != null) {
- size = cacheDir.size;
- numberSubdirs = cacheDir.subdirs;
- } else {
- LOG.warn("Cannot find size and number of subdirectories of" +
- " baseDir: " + lcacheStatus.getBaseDir());
- }
- }
- }
-
- if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
- // try some cache deletions
- compactCache(conf);
- }
} catch (IOException ie) {
synchronized (lcacheStatus) {
// release this cache
@@ -260,7 +232,7 @@ public class TrackerDistributedCacheMana
if (size != 0) {
synchronized (status) {
status.size = size;
- addCacheInfoUpdate(status);
+ baseDirManager.addCacheInfoUpdate(status);
}
}
}
@@ -292,54 +264,6 @@ public class TrackerDistributedCacheMana
return user;
}
-
- // To delete the caches which have a refcount of zero
-
- private void compactCache(Configuration conf) throws IOException {
- List<CacheStatus> deleteList = new LinkedList<CacheStatus>();
- // try deleting cache Status with refcount of zero
- synchronized (cachedArchives) {
- for (Iterator<String> it = cachedArchives.keySet().iterator();
- it.hasNext();) {
- String cacheId = it.next();
- CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- // if reference count is zero
- // mark the cache for deletion
- if (lcacheStatus.refcount == 0) {
- // delete this cache entry from the global list
- // and mark the localized file for deletion
- deleteList.add(lcacheStatus);
- it.remove();
- }
- }
- }
-
- // do the deletion, after releasing the global lock
- for (CacheStatus lcacheStatus : deleteList) {
- synchronized (lcacheStatus) {
- Path potentialDeletee = lcacheStatus.localizedLoadPath;
- Path localizedDir = lcacheStatus.getLocalizedUniqueDir();
- if (lcacheStatus.user == null) {
- LOG.info("Deleted path " + localizedDir);
- try {
- localFs.delete(localizedDir, true);
- } catch (IOException e) {
- LOG.warn("Could not delete distributed cache empty directory "
- + localizedDir, e);
- }
- } else {
- LOG.info("Deleted path " + localizedDir + " as " + lcacheStatus.user);
- String base = lcacheStatus.getBaseDir().toString();
- String userDir = TaskTracker.getUserDir(lcacheStatus.user);
- int skip = base.length() + 1 + userDir.length() + 1;
- String relative = localizedDir.toString().substring(skip);
- taskController.deleteAsUser(lcacheStatus.user, relative);
- }
- deleteCacheInfoUpdate(lcacheStatus);
- }
- }
- }
-
/*
* Returns the relative path of the dir this cache will be localized in
* relative path that this cache will be localized in. For
@@ -541,7 +465,7 @@ public class TrackerDistributedCacheMana
// Increase the size and sub directory count of the cache
// from baseDirSize and baseDirNumberSubDir.
- addCacheInfoUpdate(cacheStatus);
+ baseDirManager.addCacheInfoUpdate(cacheStatus);
LOG.info(String.format("Cached %s as %s",
cache.toString(), cacheStatus.localizedLoadPath));
@@ -617,28 +541,31 @@ public class TrackerDistributedCacheMana
}
static class CacheStatus {
- // the local load path of this cache
- Path localizedLoadPath;
-
- //the base dir where the cache lies
- Path localizedBaseDir;
-
- //the size of this cache
- long size;
-
- // number of instances using this cache
- int refcount;
-
- // is it initialized ?
- boolean inited = false;
-
+ //
+ // This field should be accessed under global cachedArchives lock.
+ //
+ int refcount; // number of instances using this cache
+
+ //
+ // The following two fields should be accessed under
+ // individual cacheStatus lock.
+ //
+ long size; //the size of this cache.
+ boolean inited = false; // is it initialized ?
+
+ //
+ // The following five fields are Immutable.
+ //
+
// The sub directory (tasktracker/archive or tasktracker/user/archive),
// under which the file will be localized
Path subDir;
-
// unique string used in the construction of local load path
String uniqueString;
-
+ // the local load path of this cache
+ Path localizedLoadPath;
+ //the base dir where the cache lies
+ Path localizedBaseDir;
// The user that owns the cache entry or null if it is public
final String user;
@@ -943,10 +870,11 @@ public class TrackerDistributedCacheMana
return path;
}
+
/**
* A thread to check and cleanup the unused files periodically
*/
- protected class CleanupThread extends Thread {
+ private class CleanupThread extends Thread {
// How often do we check if we need to clean up cache files?
private long cleanUpCheckPeriod = 60000L; // 1 minute
public CleanupThread(Configuration conf) {
@@ -954,7 +882,6 @@ public class TrackerDistributedCacheMana
conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod",
cleanUpCheckPeriod);
}
-
private volatile boolean running = true;
public void stopRunning() {
@@ -967,33 +894,19 @@ public class TrackerDistributedCacheMana
try {
Thread.sleep(cleanUpCheckPeriod);
baseDirManager.checkAndCleanup();
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Exception in DistributedCache CleanupThread.", e);
- } catch(InterruptedException e) {
- LOG.info("Cleanup...",e);
- //To force us to exit cleanly
- running = false;
- } catch (Throwable t) {
- exitTaskTracker(t);
+ // This thread should keep running and never crash.
}
}
}
-
- /**
- * Exit the task tracker because of a fatal error.
- */
- protected void exitTaskTracker(Throwable t) {
- LOG.fatal("Distributed Cache cleanup thread received runtime exception." +
- " Exiting the TaskTracker", t);
- Runtime.getRuntime().exit(-1);
- }
}
/**
* This class holds properties of each base directories and is responsible
* for clean up unused cache files in base directories.
*/
- protected class BaseDirManager {
+ private class BaseDirManager {
// For holding the properties of each cache directory
private class CacheDir {
@@ -1072,44 +985,60 @@ public class TrackerDistributedCacheMana
* and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
* @param cacheStatus cache status of the cache is deleted
*/
- private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
+ public void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
if (!cacheStatus.inited) {
// if it is not created yet, do nothing.
return;
}
// decrement the size of the cache from baseDirSize
- synchronized (baseDirProperties) {
- CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+ synchronized (baseDirManager.properties) {
+ BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
if (cacheDir != null) {
cacheDir.size -= cacheStatus.size;
cacheDir.subdirs--;
} else {
LOG.warn("Cannot find size and number of subdirectories of" +
- " baseDir: " + cacheStatus.getBaseDir());
+ " baseDir: " + cacheStatus.getBaseDir());
}
}
}
-
+
/**
* Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
* Increase the size and sub directory count of the cache from baseDirSize
* and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
* @param cacheStatus cache status of the cache is added
*/
- private void addCacheInfoUpdate(CacheStatus cacheStatus) {
+ public void addCacheInfoUpdate(CacheStatus cacheStatus) {
long cacheSize = cacheStatus.size;
// decrement the size of the cache from baseDirSize
- synchronized (baseDirProperties) {
- CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+ synchronized (baseDirManager.properties) {
+ BaseDirManager.CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
if (cacheDir != null) {
cacheDir.size += cacheSize;
cacheDir.subdirs++;
} else {
- cacheDir = new CacheDir();
+ cacheDir = new BaseDirManager.CacheDir();
cacheDir.size = cacheSize;
cacheDir.subdirs = 1;
- baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+ properties.put(cacheStatus.getBaseDir(), cacheDir);
}
}
}
+ }
+
+ /**
+ * Start the background thread
+ */
+ public void startCleanupThread() {
+ this.cleanupThread.start();
+ }
+
+ /**
+ * Stop the background thread
+ */
+ public void stopCleanupThread() {
+ cleanupThread.stopRunning();
+ cleanupThread.interrupt();
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1136726&r1=1136725&r2=1136726&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Jun 16 22:57:43 2011
@@ -812,7 +812,8 @@ public class TaskTracker implements MRCo
// Initialize DistributedCache
this.distributedCacheManager = new TrackerDistributedCacheManager(
this.fConf, taskController);
-
+ this.distributedCacheManager.startCleanupThread();
+
this.jobClient = (InterTrackerProtocol)
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@@ -1365,6 +1366,7 @@ public class TaskTracker implements MRCo
this.mapLauncher.interrupt();
this.reduceLauncher.interrupt();
+ this.distributedCacheManager.stopCleanupThread();
jvmManager.stop();
// shutdown RPC connections
Modified: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1136726&r1=1136725&r2=1136726&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Thu Jun 16 22:57:43 2011
@@ -622,10 +622,13 @@ public class TestTrackerDistributedCache
Configuration conf2 = new Configuration(conf);
conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+ conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 200); // 200 ms
refreshConf(conf2);
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf2, taskController);
+ manager.startCleanupThread();
+ try {
FileSystem localfs = FileSystem.getLocal(conf2);
long now = System.currentTimeMillis();
String userName = getJobOwnerName();
@@ -659,9 +662,9 @@ public class TestTrackerDistributedCache
fs.getFileStatus(secondCacheFilePublic), false,
fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
cfile2);
- assertFalse("DistributedCache failed deleting old" +
- " cache when the cache store is full.",
- localfs.exists(firstLocalCache));
+ checkCacheDeletion(localfs, firstLocalCache,
+ "DistributedCache failed deleting old" +
+ " cache when the cache store is full");
// find the root directory of distributed caches
Path firstCursor = firstLocalCache;
@@ -691,8 +694,12 @@ public class TestTrackerDistributedCache
conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT * 10);
conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories",
LOCAL_CACHE_SUBDIR_LIMIT);
+ manager.stopCleanupThread();
+
manager =
new TrackerDistributedCacheManager(conf2, taskController);
+ manager.startCleanupThread();
+
// Now we test the number of sub directories limit
// Create the temporary cache files to be used in the tests.
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -736,9 +743,9 @@ public class TestTrackerDistributedCache
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(fourthCacheFile), false,
fs.getFileStatus(fourthCacheFile).getModificationTime(), false, cfile4);
- assertFalse("DistributedCache failed deleting old" +
- " cache when the cache exceeds the number of sub directories limit.",
- localfs.exists(thirdLocalCache));
+ checkCacheDeletion(localfs, thirdLocalCache,
+ "DistributedCache failed deleting old" +
+ " cache when the cache exceeds the number of sub directories limit.");
assertFalse
("DistributedCache did not delete the gensym'ed distcache "
@@ -749,8 +756,30 @@ public class TestTrackerDistributedCache
// Clean up the files created in this test
new File(thirdCacheFile.toString()).delete();
new File(fourthCacheFile.toString()).delete();
+ } finally {
+ manager.stopCleanupThread();
+ }
}
+ /**
+ * Periodically checks if a file is there, return if the file is no longer
+ * there. Fails the test if a files is there for 30 seconds.
+ */
+ private void checkCacheDeletion(FileSystem fs, Path cache, String msg)
+ throws Exception {
+ // Check every 100ms to see if the cache is deleted
+ boolean cacheExists = true;
+ for (int i = 0; i < 300; i++) {
+ if (!fs.exists(cache)) {
+ cacheExists = false;
+ break;
+ }
+ TimeUnit.MILLISECONDS.sleep(100L);
+ }
+ // If the cache is still there after 5 minutes, test fails.
+ assertFalse(msg, cacheExists);
+ }
+
public void testFileSystemOtherThanDefault() throws Exception {
if (!canRun()) {
return;