You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:36:43 UTC
svn commit: r1077061 -
/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
Author: omalley
Date: Fri Mar 4 03:36:42 2011
New Revision: 1077061
URL: http://svn.apache.org/viewvc?rev=1077061&view=rev
Log:
commit 5fb86212d97b89e94c37e52eca24ba847b656618
Author: Arun C Murthy <ac...@apache.org>
Date: Thu Nov 26 21:06:30 2009 -0800
MAPREDUCE-1186. Fix DistributedCache to do a recursive chmod on just the per-cache directory, not all of mapred.local.dir. Contributed by Amareshwari Sriramadasu.
From: https://issues.apache.org/jira/secure/attachment/12426266/patch-1186-3-ydist.txt
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1186. Fixed DistributedCache to do a recursive chmod on just the
+ per-cache directory, not all of mapred.local.dir.
+ (Amareshwari Sriramadasu via acmurthy)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=1077061&r1=1077060&r2=1077061&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java Fri Mar 4 03:36:42 2011
@@ -202,13 +202,15 @@ public class DistributedCache {
lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
// was never localized
- String cachePath = new Path (subDir,
- new Path(String.valueOf(random.nextLong()),
- makeRelative(cache, conf))).toString();
+ Path uniqueParentDir =
+ new Path(subDir, String.valueOf(random.nextLong()));
+ String cachePath = new Path(uniqueParentDir,
+ makeRelative(cache, conf)).toString();
Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
fileStatus.getLen(), conf);
- lcacheStatus = new CacheStatus(
- new Path(localPath.toString().replace(cachePath, "")), localPath);
+ lcacheStatus =
+ new CacheStatus(new Path(localPath.toString().replace(cachePath, "")),
+ localPath, uniqueParentDir);
cachedArchives.put(key, lcacheStatus);
}
lcacheStatus.refcount++;
@@ -326,17 +328,17 @@ public class DistributedCache {
// do the deletion, after releasing the global lock
for (CacheStatus lcacheStatus : deleteSet) {
synchronized (lcacheStatus) {
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
- LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+ FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true);
+ LOG.info("Deleted path " + lcacheStatus.localizedLoadPath);
// decrement the size of the cache from baseDirSize
synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+ Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
if ( dirSize != null ) {
dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
+ baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
} else {
LOG.warn("Cannot find record of the baseDir: " +
- lcacheStatus.baseDir + " during delete!");
+ lcacheStatus.localizedBaseDir + " during delete!");
}
}
}
@@ -384,12 +386,13 @@ public class DistributedCache {
// Has to be
if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
cacheStatus, fileStatus)) {
- throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
+ throw new IOException("Stale cache file: " +
+ cacheStatus.localizedLoadPath +
" for cache-file: " + cache);
}
LOG.info(String.format("Using existing cache of %s->%s",
- cache.toString(), cacheStatus.localLoadPath));
- return cacheStatus.localLoadPath;
+ cache.toString(), cacheStatus.localizedLoadPath));
+ return cacheStatus.localizedLoadPath;
}
private static void createSymlink(Configuration conf, URI cache,
@@ -404,7 +407,7 @@ public class DistributedCache {
File flink = new File(link);
if (doSymlink){
if (!flink.exists()) {
- FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
+ FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
}
}
}
@@ -421,15 +424,15 @@ public class DistributedCache {
FileSystem localFs = FileSystem.getLocal(conf);
Path parchive = null;
if (isArchive) {
- parchive = new Path(cacheStatus.localLoadPath,
- new Path(cacheStatus.localLoadPath.getName()));
+ parchive = new Path(cacheStatus.localizedLoadPath,
+ new Path(cacheStatus.localizedLoadPath.getName()));
} else {
- parchive = cacheStatus.localLoadPath;
+ parchive = cacheStatus.localizedLoadPath;
}
if (!localFs.mkdirs(parchive.getParent())) {
throw new IOException("Mkdirs failed to create directory " +
- cacheStatus.localLoadPath.toString());
+ cacheStatus.localizedLoadPath.toString());
}
String cacheId = cache.getPath();
fs.copyToLocalFile(new Path(cacheId), parchive);
@@ -451,26 +454,29 @@ public class DistributedCache {
long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
cacheStatus.size = cacheSize;
synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+ Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
if (dirSize == null) {
dirSize = Long.valueOf(cacheSize);
} else {
dirSize += cacheSize;
}
- baseDirSize.put(cacheStatus.baseDir, dirSize);
+ baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
}
// do chmod here
try {
//Setting recursive permission to grant everyone read and execute
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+ Path localDir = new Path(cacheStatus.localizedBaseDir,
+ cacheStatus.uniqueParentDir);
+ LOG.info("Doing chmod on localdir :" + localDir);
+ FileUtil.chmod(localDir.toString(), "ugo+rx", true);
} catch(InterruptedException e) {
LOG.warn("Exception in chmod" + e.toString());
}
// update cacheStatus to reflect the newly cached file
cacheStatus.mtime = getTimestamp(conf, cache);
- return cacheStatus.localLoadPath;
+ return cacheStatus.localizedLoadPath;
}
private static boolean isTarFile(String filename) {
@@ -853,10 +859,13 @@ public class DistributedCache {
private static class CacheStatus {
// the local load path of this cache
- Path localLoadPath;
+ Path localizedLoadPath;
//the base dir where the cache lies
- Path baseDir;
+ Path localizedBaseDir;
+
+ // the unique directory in localizedBaseDir, where the cache lies
+ Path uniqueParentDir;
//the size of this cache
long size;
@@ -870,18 +879,19 @@ public class DistributedCache {
// is it initialized?
boolean inited = false;
- public CacheStatus(Path baseDir, Path localLoadPath) {
+ public CacheStatus(Path baseDir, Path localLoadPath, Path uniqueParentDir) {
super();
- this.localLoadPath = localLoadPath;
+ this.localizedLoadPath = localLoadPath;
this.refcount = 0;
this.mtime = -1;
- this.baseDir = baseDir;
+ this.localizedBaseDir = baseDir;
this.size = 0;
+ this.uniqueParentDir = uniqueParentDir;
}
// get the base dir for the cache
Path getBaseDir() {
- return baseDir;
+ return localizedBaseDir;
}
// Is it initialized?
@@ -905,7 +915,7 @@ public class DistributedCache {
FileSystem localFs = FileSystem.getLocal(conf);
for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
try {
- localFs.delete(f.getValue().localLoadPath, true);
+ localFs.delete(f.getValue().localizedLoadPath, true);
} catch (IOException ie) {
LOG.debug("Error cleaning up cache", ie);
}