You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/06/17 00:28:50 UTC
svn commit: r1136712 - in /hadoop/common/branches/branch-0.20-security-204:
CHANGES.txt
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Author: omalley
Date: Thu Jun 16 22:28:49 2011
New Revision: 1136712
URL: http://svn.apache.org/viewvc?rev=1136712&view=rev
Log:
MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
thread dies. (Robert Joseph Evans via cdouglas)
Modified:
hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/CHANGES.txt?rev=1136712&r1=1136711&r2=1136712&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-204/CHANGES.txt Thu Jun 16 22:28:49 2011
@@ -11,6 +11,9 @@ Release 0.20.204.0 - unreleased
BUG FIXES
+ MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
+ thread dies. (Robert Joseph Evans via cdouglas)
+
MAPREDUCE-2555. Avoid sprious logging from completedtasks. (Thomas Graves
via cdouglas)
Propchange: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 16 22:28:49 2011
@@ -1,5 +1,5 @@
/hadoop/common/branches/branch-0.20/CHANGES.txt:826138,826568,829987,831184,833001,880632,898713,909245,909723,960946,1044225
-/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1131277,1131286,1131290,1131299,1131737,1134140
+/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1127362,1131277,1131286,1131290,1131299,1131737,1134140
/hadoop/common/branches/branch-0.20-security-203/CHANGES.txt:1096071,1097012-1099333,1102071,1128115
/hadoop/common/branches/branch-0.20-security-205/CHANGES.txt:1132788,1133133,1133274,1133282
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1136712&r1=1136711&r2=1136712&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Thu Jun 16 22:28:49 2011
@@ -100,6 +100,9 @@ public class TrackerDistributedCacheMana
private Configuration trackerConf;
private static final Random random = new Random();
+
+ protected BaseDirManager baseDirManager = new BaseDirManager();
+ protected CleanupThread cleanupThread;
public TrackerDistributedCacheManager(Configuration conf,
TaskController controller
@@ -941,6 +944,130 @@ public class TrackerDistributedCacheMana
}
/**
+ * A thread to check and cleanup the unused files periodically
+ */
+ protected class CleanupThread extends Thread {
+ // How often do we check if we need to clean up cache files?
+ private long cleanUpCheckPeriod = 60000L; // 1 minute
+ public CleanupThread(Configuration conf) {
+ cleanUpCheckPeriod =
+ conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod",
+ cleanUpCheckPeriod);
+ }
+
+ private volatile boolean running = true;
+
+ public void stopRunning() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ Thread.sleep(cleanUpCheckPeriod);
+ baseDirManager.checkAndCleanup();
+ } catch (IOException e) {
+ LOG.error("Exception in DistributedCache CleanupThread.", e);
+ } catch(InterruptedException e) {
+ LOG.info("Cleanup...",e);
+ //To force us to exit cleanly
+ running = false;
+ } catch (Throwable t) {
+ exitTaskTracker(t);
+ }
+ }
+ }
+
+ /**
+ * Exit the task tracker because of a fatal error.
+ */
+ protected void exitTaskTracker(Throwable t) {
+ LOG.fatal("Distributed Cache cleanup thread received runtime exception." +
+ " Exiting the TaskTracker", t);
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+
+ /**
+ * This class holds properties of each base directories and is responsible
+ * for clean up unused cache files in base directories.
+ */
+ protected class BaseDirManager {
+
+ // For holding the properties of each cache directory
+ private class CacheDir {
+ long size;
+ long subdirs;
+ }
+
+ private TreeMap<Path, BaseDirManager.CacheDir> properties =
+ new TreeMap<Path, BaseDirManager.CacheDir>();
+
+ private long getDirSize(Path p) {
+ return properties.get(p).size;
+ }
+ private long getDirSubdirs(Path p) {
+ return properties.get(p).subdirs;
+ }
+
+ void checkAndCleanup() throws IOException {
+ Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
+ Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+ synchronized (properties) {
+ for (Path baseDir : properties.keySet()) {
+ if (allowedCacheSize < getDirSize(baseDir) ||
+ allowedCacheSubdirs < getDirSubdirs(baseDir)) {
+ toBeCleanedBaseDir.add(baseDir);
+ }
+ }
+ }
+ // try deleting cache Status with refcount of zero
+ synchronized (cachedArchives) {
+ for (Iterator<String> it = cachedArchives.keySet().iterator();
+ it.hasNext();) {
+ String cacheId = it.next();
+ CacheStatus cacheStatus = cachedArchives.get(cacheId);
+ if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+ synchronized (cacheStatus) {
+ // if reference count is zero mark the cache for deletion
+ if (cacheStatus.refcount == 0) {
+ // delete this cache entry from the global list
+ // and mark the localized file for deletion
+ toBeDeletedCache.add(cacheStatus);
+ it.remove();
+ }
+ }
+ }
+ }
+ }
+
+ // do the deletion, after releasing the global lock
+ for (CacheStatus cacheStatus : toBeDeletedCache) {
+ synchronized (cacheStatus) {
+ Path localizedDir = cacheStatus.getLocalizedUniqueDir();
+ if (cacheStatus.user == null) {
+ TrackerDistributedCacheManager.LOG.info("Deleted path " + localizedDir);
+ try {
+ localFs.delete(localizedDir, true);
+ } catch (IOException e) {
+ TrackerDistributedCacheManager.LOG.warn("Could not delete distributed cache empty directory "
+ + localizedDir, e);
+ }
+ } else {
+ TrackerDistributedCacheManager.LOG.info("Deleted path " + localizedDir + " as " + cacheStatus.user);
+ String base = cacheStatus.getBaseDir().toString();
+ String userDir = TaskTracker.getUserDir(cacheStatus.user);
+ int skip = base.length() + 1 + userDir.length() + 1;
+ String relative = localizedDir.toString().substring(skip);
+ taskController.deleteAsUser(cacheStatus.user, relative);
+ }
+ deleteCacheInfoUpdate(cacheStatus);
+ }
+ }
+ }
+
+ /**
* Decrement the size and sub directory count of the cache from baseDirSize
* and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
* @param cacheStatus cache status of the cache is deleted
Modified: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1136712&r1=1136711&r2=1136712&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Thu Jun 16 22:28:49 2011
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginException;
@@ -546,6 +548,62 @@ public class TestTrackerDistributedCache
}
}
+ public static class MyTrackerDistributedCacheManager
+ extends TrackerDistributedCacheManager {
+
+ public Throwable caught = null;
+ public CountDownLatch done = new CountDownLatch(1);
+
+
+ public MyTrackerDistributedCacheManager(Configuration conf,
+ TaskController controller) throws IOException {
+ super(conf, controller);
+ this.baseDirManager = new TrackerDistributedCacheManager.BaseDirManager() {
+
+ @Override
+ void checkAndCleanup() throws IOException {
+ throw new RuntimeException("This is a test!!!!");
+ }
+ };
+
+ this.cleanupThread = new TestCleanupThread(conf);
+ }
+
+ class TestCleanupThread extends TrackerDistributedCacheManager.CleanupThread {
+
+ public TestCleanupThread(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected void exitTaskTracker(Throwable t) {
+ caught = t;
+ this.stopRunning();
+ done.countDown();
+ }
+ }
+ }
+
+ public void testRuntimeExceptionInCleanup() throws Exception {
+ if(!canRun()) {
+ return;
+ }
+
+ Configuration conf2 = new Configuration(conf);
+ conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
+ conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+ conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 0); // 0 ms (Don't sleep)
+
+ refreshConf(conf2);
+ MyTrackerDistributedCacheManager manager =
+ new MyTrackerDistributedCacheManager(conf2, taskController);
+ manager.startCleanupThread();
+
+ assertTrue(manager.done.await(200l, TimeUnit.MILLISECONDS));
+ assertNotNull(manager.caught);
+ assertTrue(manager.caught instanceof RuntimeException);
+ }
+
protected String getJobOwnerName() throws IOException {
return UserGroupInformation.getLoginUser().getUserName();
}