You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:43:57 UTC
svn commit: r1077129 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
c++/task-controller/ mapred/org/apache/hadoop/filecache/
mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/filecache/
test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 03:43:57 2011
New Revision: 1077129
URL: http://svn.apache.org/viewvc?rev=1077129&view=rev
Log:
commit 395429ee28f7bb21626c792df83811c4310f9b6c
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Wed Jan 27 21:58:11 2010 +0530
MAPREDUCE:1186 from https://issues.apache.org/jira/secure/attachment/12431573/1186.20S-6.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1186. Modified code in distributed cache to set
+ permissions only on required set of localized paths.
+ (Amareshwari Sriramadasu via yhemanth)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c Fri Mar 4 03:43:57 2011
@@ -51,6 +51,7 @@ int main(int argc, char **argv) {
const char * task_id = NULL;
const char * tt_root = NULL;
const char *log_dir = NULL;
+ const char * unique_string = NULL;
int exit_code = 0;
const char * task_pid = NULL;
const char* const short_options = "l:";
@@ -113,8 +114,11 @@ int main(int argc, char **argv) {
job_id = argv[optind++];
exit_code = initialize_job(job_id, user_detail->pw_name);
break;
- case INITIALIZE_DISTRIBUTEDCACHE:
- exit_code = initialize_distributed_cache(user_detail->pw_name);
+ case INITIALIZE_DISTRIBUTEDCACHE_FILE:
+ tt_root = argv[optind++];
+ unique_string = argv[optind++];
+ exit_code = initialize_distributed_cache_file(tt_root, unique_string,
+ user_detail->pw_name);
break;
case LAUNCH_TASK_JVM:
tt_root = argv[optind++];
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar 4 03:43:57 2011
@@ -172,9 +172,10 @@ char *get_user_directory(const char *tt_
/**
* Get the distributed cache directory for a particular user
*/
-char *get_distributed_cache_directory(const char *tt_root, const char *user) {
- return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, "dist_cache_path", 2,
- tt_root, user);
+char *get_distributed_cache_directory(const char *tt_root, const char *user,
+ const char* unique_string) {
+ return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN,
+ "dist_cache_unique_path", 3, tt_root, user, unique_string);
}
char *get_job_work_directory(const char *job_dir) {
@@ -795,22 +796,25 @@ int initialize_job(const char *jobid, co
}
/**
- * Function to initialize the distributed cache files of a user.
+ * Function to initialize the distributed cache file for a user.
* It does the following:
- * * sudo chown user:mapred -R taskTracker/$user/distcache
- * * sudo chmod 2570 -R taskTracker/$user/distcache
- * This is done once per every JVM launch. Tasks reusing JVMs just create
+ * * sudo chown user:mapred -R taskTracker/$user/distcache/<randomdir>
+ * * sudo chmod 2570 -R taskTracker/$user/distcache/<randomdir>
+ * This is done once per localization. Tasks reusing JVMs just create
* symbolic links themselves and so there isn't anything specific to do in
* that case.
- * Sometimes, it happens that a task uses the whole or part of a directory
- * structure in taskTracker/$user/distcache. In this case, some paths are
- * already set proper private permissions by this same function called during
- * a previous JVM launch. In the current invocation, we only do the
- * chown/chmod operation of files/directories that are newly created by the
- * TaskTracker (i.e. those that still are not owned by user:mapred)
*/
-int initialize_distributed_cache(const char *user) {
-
+int initialize_distributed_cache_file(const char *tt_root,
+ const char *unique_string, const char *user) {
+ if (tt_root == NULL) {
+ fprintf(LOGFILE, "tt_root passed is null.\n");
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ if (unique_string == NULL) {
+ fprintf(LOGFILE, "unique_string passed is null.\n");
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
if (user == NULL) {
fprintf(LOGFILE, "user passed is null.\n");
return INVALID_ARGUMENT_NUMBER;
@@ -820,69 +824,41 @@ int initialize_distributed_cache(const c
fprintf(LOGFILE, "Couldn't get the user details of %s", user);
return INVALID_USER_NAME;
}
-
- gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
-
- char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
- if (local_dir == NULL) {
- fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ //Check tt_root
+ if (check_tt_root(tt_root) < 0) {
+ fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
cleanup();
return INVALID_TT_ROOT;
}
- char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
-#ifdef DEBUG
- fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
- full_local_dir_str);
-#endif
+ // set permission on the unique directory
+ char *localized_unique_dir = get_distributed_cache_directory(tt_root, user,
+ unique_string);
+ if (localized_unique_dir == NULL) {
+ fprintf(LOGFILE, "Couldn't get unique distcache directory for %s.\n", user);
+ cleanup();
+ return INITIALIZE_DISTCACHEFILE_FAILED;
+ }
- char *distcache_dir;
- char **local_dir_ptr = local_dir;
+ gid_t binary_gid = getegid(); // the group permissions of the binary.
int failed = 0;
- while (*local_dir_ptr != NULL) {
- distcache_dir = get_distributed_cache_directory(*local_dir_ptr, user);
- if (distcache_dir == NULL) {
- fprintf(LOGFILE, "Couldn't get distcache directory for %s.\n", user);
- failed = 1;
- break;
- }
-
- struct stat filestat;
- if (stat(distcache_dir, &filestat) != 0) {
- if (errno == ENOENT) {
-#ifdef DEBUG
- fprintf(LOGFILE, "distcache_dir %s doesn't exist. Not doing anything.\n",
- distcache_dir);
-#endif
- } else {
- // stat failed because of something else!
- fprintf(LOGFILE, "Failed to stat the distcache_dir %s\n",
- distcache_dir);
- failed = 1;
- free(distcache_dir);
- break;
- }
- } else if (secure_path(distcache_dir, user_detail->pw_uid,
- tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
+ struct stat filestat;
+ if (stat(localized_unique_dir, &filestat) != 0) {
+ // stat on distcache failed because of something
+ fprintf(LOGFILE, "Failed to stat the localized_unique_dir %s\n",
+ localized_unique_dir);
+ failed = INITIALIZE_DISTCACHEFILE_FAILED;
+ } else if (secure_path(localized_unique_dir, user_detail->pw_uid,
+ binary_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
| S_IXUSR | S_IRWXG, 1) != 0) {
- // No setgid on files and setgid on dirs, 570
- fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n",
- distcache_dir);
- failed = 1;
- free(distcache_dir);
- break;
- }
-
- local_dir_ptr++;
- free(distcache_dir);
+ // No setgid on files and setgid on dirs, 570
+ fprintf(LOGFILE, "Failed to secure the localized_unique_dir %s\n",
+ localized_unique_dir);
+ failed = INITIALIZE_DISTCACHEFILE_FAILED;
}
- free(local_dir);
- free(full_local_dir_str);
+ free(localized_unique_dir);
cleanup();
- if (failed) {
- return INITIALIZE_DISTCACHE_FAILED;
- }
- return 0;
+ return failed;
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar 4 03:43:57 2011
@@ -39,7 +39,7 @@
enum command {
INITIALIZE_USER,
INITIALIZE_JOB,
- INITIALIZE_DISTRIBUTEDCACHE,
+ INITIALIZE_DISTRIBUTEDCACHE_FILE,
LAUNCH_TASK_JVM,
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
@@ -66,7 +66,7 @@ enum errorcodes {
PREPARE_TASK_LOGS_FAILED, //16
INVALID_TT_LOG_DIR, //17
OUT_OF_MEMORY, //18
- INITIALIZE_DISTCACHE_FAILED, //19
+ INITIALIZE_DISTCACHEFILE_FAILED, //19
INITIALIZE_USER_FAILED, //20
UNABLE_TO_BUILD_PATH //21
};
@@ -75,7 +75,7 @@ enum errorcodes {
#define TT_JOB_DIR_PATTERN USER_DIR_PATTERN"/jobcache/%s"
-#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache"
+#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache/%s"
#define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work"
@@ -109,7 +109,8 @@ int initialize_task(const char *jobid, c
int initialize_job(const char *jobid, const char *user);
-int initialize_distributed_cache(const char *user);
+int initialize_distributed_cache_file(const char *tt_root,
+ const char* unique_string, const char *user);
int kill_user_task(const char *user, const char *task_pid, int sig);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java Fri Mar 4 03:43:57 2011
@@ -24,6 +24,9 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import java.net.URI;
@@ -196,9 +199,9 @@ public class DistributedCache {
boolean isArchive, long confFileStamp,
Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
- return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
- baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
- honorSymLinkConf);
+ return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+ .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
+ confFileStamp, currentWorkDir, honorSymLinkConf, false);
}
/**
@@ -275,8 +278,8 @@ public class DistributedCache {
if (timestamp == null) {
throw new IOException("TimeStamp of the uri couldnot be found");
}
- new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
- Long.parseLong(timestamp));
+ new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+ .releaseCache(cache, conf, Long.parseLong(timestamp));
}
/**
@@ -289,10 +292,11 @@ public class DistributedCache {
* @deprecated Internal to MapReduce framework. Use DistributedCacheManager
* instead.
*/
+ @Deprecated
public static String makeRelative(URI cache, Configuration conf)
throws IOException {
- return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
-
+ return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+ .makeRelative(cache, conf);
}
/**
@@ -656,6 +660,7 @@ public class DistributedCache {
* instead.
*/
public static void purgeCache(Configuration conf) throws IOException {
- new TrackerDistributedCacheManager(conf).purgeCache();
+ new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+ .purgeCache();
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java Fri Mar 4 03:43:57 2011
@@ -172,7 +172,7 @@ public class TaskDistributedCacheManager
Path p = distributedCacheManager.getLocalCache(uri, taskConf,
cacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
- cacheFile.timestamp, workdirPath, false);
+ cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
cacheFile.setLocalized(true);
if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar 4 03:43:57 2011
@@ -30,6 +30,8 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -67,14 +69,18 @@ public class TrackerDistributedCacheMana
private LocalDirAllocator lDirAllocator;
+ private TaskController taskController;
+
private Configuration trackerConf;
private Random random = new Random();
- public TrackerDistributedCacheManager(Configuration conf) throws IOException {
+ public TrackerDistributedCacheManager(Configuration conf,
+ TaskController taskController) throws IOException {
this.localFs = FileSystem.getLocal(conf);
this.trackerConf = conf;
this.lDirAllocator = new LocalDirAllocator("mapred.local.dir");
+ this.taskController = taskController;
}
/**
@@ -102,6 +108,7 @@ public class TrackerDistributedCacheMana
* launches
* NOTE: This is effectively always on since r696957, since there is no code
* path that does not use this.
+ * @param isPublic to know the cache file is accessible to public or private
* @return the path to directory where the archives are unjarred in case of
* archives, the path to the file where the file is copied locally
* @throws IOException
@@ -109,7 +116,7 @@ public class TrackerDistributedCacheMana
Path getLocalCache(URI cache, Configuration conf,
String subDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
- Path currentWorkDir, boolean honorSymLinkConf)
+ Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
throws IOException {
String key = getKey(cache, conf, confFileStamp);
CacheStatus lcacheStatus;
@@ -118,13 +125,13 @@ public class TrackerDistributedCacheMana
lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
// was never localized
+ String uniqueString = String.valueOf(random.nextLong());
String cachePath = new Path (subDir,
- new Path(String.valueOf(random.nextLong()),
- makeRelative(cache, conf))).toString();
+ new Path(uniqueString, makeRelative(cache, conf))).toString();
Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
fileStatus.getLen(), trackerConf);
- lcacheStatus = new CacheStatus(
- new Path(localPath.toString().replace(cachePath, "")), localPath);
+ lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
+ cachePath, "")), localPath, new Path(subDir), uniqueString);
cachedArchives.put(key, lcacheStatus);
}
@@ -138,7 +145,7 @@ public class TrackerDistributedCacheMana
synchronized (lcacheStatus) {
if (!lcacheStatus.isInited()) {
localizedPath = localizeCache(conf, cache, confFileStamp,
- lcacheStatus, fileStatus, isArchive);
+ lcacheStatus, fileStatus, isArchive, isPublic);
lcacheStatus.initComplete();
} else {
localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
@@ -242,17 +249,17 @@ public class TrackerDistributedCacheMana
// do the deletion, after releasing the global lock
for (CacheStatus lcacheStatus : deleteSet) {
synchronized (lcacheStatus) {
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
- LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+ FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true);
+ LOG.info("Deleted path " + lcacheStatus.localizedLoadPath);
// decrement the size of the cache from baseDirSize
synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+ Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
if ( dirSize != null ) {
dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
+ baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
} else {
LOG.warn("Cannot find record of the baseDir: " +
- lcacheStatus.baseDir + " during delete!");
+ lcacheStatus.localizedBaseDir + " during delete!");
}
}
}
@@ -295,12 +302,12 @@ public class TrackerDistributedCacheMana
// Has to be
if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
cacheStatus, fileStatus)) {
- throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
- " for cache-file: " + cache);
+ throw new IOException("Stale cache file: " + cacheStatus.localizedLoadPath +
+ " for cache-file: " + cache);
}
LOG.info(String.format("Using existing cache of %s->%s",
- cache.toString(), cacheStatus.localLoadPath));
- return cacheStatus.localLoadPath;
+ cache.toString(), cacheStatus.localizedLoadPath));
+ return cacheStatus.localizedLoadPath;
}
/**
@@ -361,7 +368,7 @@ public class TrackerDistributedCacheMana
File flink = new File(link);
if (doSymlink){
if (!flink.exists()) {
- FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
+ FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
}
}
}
@@ -372,21 +379,23 @@ public class TrackerDistributedCacheMana
URI cache, long confFileStamp,
CacheStatus cacheStatus,
FileStatus fileStatus,
- boolean isArchive)
+ boolean isArchive, boolean isPublic)
throws IOException {
FileSystem fs = FileSystem.get(cache, conf);
FileSystem localFs = FileSystem.getLocal(conf);
Path parchive = null;
if (isArchive) {
- parchive = new Path(cacheStatus.localLoadPath,
- new Path(cacheStatus.localLoadPath.getName()));
- } else {
- parchive = cacheStatus.localLoadPath;
+ parchive = new Path(cacheStatus.localizedLoadPath,
+ new Path(cacheStatus.localizedLoadPath.getName()));
+ } else {
+ parchive = cacheStatus.localizedLoadPath;
}
+
if (!localFs.mkdirs(parchive.getParent())) {
throw new IOException("Mkdirs failed to create directory " +
- cacheStatus.localLoadPath.toString());
+ cacheStatus.localizedLoadPath.toString());
}
+
String cacheId = cache.getPath();
fs.copyToLocalFile(new Path(cacheId), parchive);
if (isArchive) {
@@ -413,27 +422,45 @@ public class TrackerDistributedCacheMana
FileUtil.getDU(new File(parchive.getParent().toString()));
cacheStatus.size = cacheSize;
synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+ Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
if( dirSize == null ) {
dirSize = Long.valueOf(cacheSize);
} else {
dirSize += cacheSize;
}
- baseDirSize.put(cacheStatus.baseDir, dirSize);
- }
- // do chmod here
- try {
- //Setting recursive permission to grant everyone read and execute
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
- } catch(InterruptedException e) {
- LOG.warn("Exception in chmod" + e.toString());
+ baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
}
+
+ // set proper permissions for the localized directory
+ setPermissions(conf, cacheStatus, isPublic);
+
// update cacheStatus to reflect the newly cached file
cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
LOG.info(String.format("Cached %s as %s",
- cache.toString(), cacheStatus.localLoadPath));
- return cacheStatus.localLoadPath;
+ cache.toString(), cacheStatus.localizedLoadPath));
+ return cacheStatus.localizedLoadPath;
+ }
+
+ private void setPermissions(Configuration conf, CacheStatus cacheStatus,
+ boolean isPublic) throws IOException {
+ if (isPublic) {
+ Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
+ LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
+ try {
+ FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true);
+ } catch (InterruptedException e) {
+ LOG.warn("Exception in chmod" + e.toString());
+ throw new IOException(e);
+ }
+ } else {
+ // invoke taskcontroller to set permissions
+ DistributedCacheFileContext context = new DistributedCacheFileContext(
+ conf.get("user.name"), new File(cacheStatus.localizedBaseDir
+ .toString()), cacheStatus.localizedBaseDir,
+ cacheStatus.uniqueString);
+ taskController.initializeDistributedCacheFile(context);
+ }
}
private static boolean isTarFile(String filename) {
@@ -508,10 +535,10 @@ public class TrackerDistributedCacheMana
static class CacheStatus {
// the local load path of this cache
- Path localLoadPath;
+ Path localizedLoadPath;
//the base dir where the cache lies
- Path baseDir;
+ Path localizedBaseDir;
//the size of this cache
long size;
@@ -524,18 +551,28 @@ public class TrackerDistributedCacheMana
// is it initialized ?
boolean inited = false;
+
+ // The sub directory (tasktracker/archive or tasktracker/user/archive),
+ // under which the file will be localized
+ Path subDir;
- public CacheStatus(Path baseDir, Path localLoadPath) {
+ // unique string used in the construction of local load path
+ String uniqueString;
+
+ public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
+ String uniqueString) {
super();
- this.localLoadPath = localLoadPath;
+ this.localizedLoadPath = localLoadPath;
this.refcount = 0;
this.mtime = -1;
- this.baseDir = baseDir;
+ this.localizedBaseDir = baseDir;
this.size = 0;
+ this.subDir = subDir;
+ this.uniqueString = uniqueString;
}
Path getBaseDir(){
- return this.baseDir;
+ return this.localizedBaseDir;
}
// mark it as initialized
@@ -547,6 +584,10 @@ public class TrackerDistributedCacheMana
boolean isInited() {
return inited;
}
+
+ Path getLocalizedUniqueDir() {
+ return new Path(localizedBaseDir, new Path(subDir, uniqueString));
+ }
}
/**
@@ -558,7 +599,7 @@ public class TrackerDistributedCacheMana
synchronized (cachedArchives) {
for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
try {
- localFs.delete(f.getValue().localLoadPath, true);
+ localFs.delete(f.getValue().localizedLoadPath, true);
} catch (IOException ie) {
LOG.debug("Error cleaning up cache", ie);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar 4 03:43:57 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.util.Shell.Shel
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
/**
* The default implementation for controlling tasks.
@@ -153,8 +154,17 @@ public class DefaultTaskController exten
}
@Override
- public void initializeDistributedCache(InitializationContext context) {
- // Do nothing.
+ public void initializeDistributedCacheFile(DistributedCacheFileContext context)
+ throws IOException {
+ Path localizedUniqueDir = context.getLocalizedUniqueDir();
+ try {
+ // Setting recursive execute permission on localized dir
+ LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
+ FileUtil.chmod(localizedUniqueDir.toString(), "+x", true);
+ } catch (InterruptedException ie) {
+ LOG.warn("Exception in doing chmod on" + localizedUniqueDir, ie);
+ throw new IOException(ie);
+ }
}
@Override
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar 4 03:43:57 2011
@@ -85,7 +85,7 @@ class LinuxTaskController extends TaskCo
enum TaskCommands {
INITIALIZE_USER,
INITIALIZE_JOB,
- INITIALIZE_DISTRIBUTEDCACHE,
+ INITIALIZE_DISTRIBUTEDCACHE_FILE,
LAUNCH_TASK_JVM,
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
@@ -342,12 +342,21 @@ class LinuxTaskController extends TaskCo
}
@Override
- public void initializeDistributedCache(InitializationContext context)
+ public void initializeDistributedCacheFile(DistributedCacheFileContext context)
throws IOException {
- LOG.debug("Going to initialize distributed cache for " + context.user
- + " on the TT");
- runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE, context.user,
- new ArrayList<String>(), context.workDir, null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to initialize distributed cache for " + context.user
+ + " with localizedBaseDir " + context.localizedBaseDir +
+ " and uniqueString " + context.uniqueString);
+ }
+ List<String> args = new ArrayList<String>();
+ // Here, uniqueString might start with '-'. Adding -- in front of the
+ // arguments indicates that they are non-option parameters.
+ args.add("--");
+ args.add(context.localizedBaseDir.toString());
+ args.add(context.uniqueString);
+ runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, context.user,
+ args, context.workDir, null);
}
@Override
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 4 03:43:57 2011
@@ -112,7 +112,7 @@ class LocalJobRunner implements JobSubmi
// Manage the distributed cache. If there are files to be copied,
// this will trigger localFile to be re-written again.
this.trackerDistributerdCacheManager =
- new TrackerDistributedCacheManager(conf);
+ new TrackerDistributedCacheManager(conf, new DefaultTaskController());
this.taskDistributedCacheManager =
trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
taskDistributedCacheManager.setup(
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar 4 03:43:57 2011
@@ -70,12 +70,10 @@ public abstract class TaskController imp
* disks:
* <ul>
* <li>mapred-local directories</li>
- * <li>Job cache directories</li>
- * <li>Archive directories</li>
* <li>Hadoop log directories</li>
* </ul>
*/
- void setup() {
+ public void setup() {
for (String localDir : this.mapredLocalDirs) {
// Set up the mapred-local directories.
File mapredlocalDir = new File(localDir);
@@ -109,13 +107,13 @@ public abstract class TaskController imp
/**
* Take task-controller specific actions to initialize the distributed cache
- * files. This involves setting appropriate permissions for these files so as
+ * file. This involves setting appropriate permissions for these files so as
* to secure them to be accessible only their owners.
*
* @param context
* @throws IOException
*/
- public abstract void initializeDistributedCache(InitializationContext context)
+ public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
throws IOException;
/**
@@ -257,6 +255,37 @@ public abstract class TaskController imp
public static class InitializationContext {
public File workDir;
public String user;
+
+ public InitializationContext() {
+ }
+
+ public InitializationContext(String user, File workDir) {
+ this.user = user;
+ this.workDir = workDir;
+ }
+ }
+
+ /**
+ * This is used for initializing the private localized files in distributed
+ * cache. Initialization would involve changing permission, ownership and etc.
+ */
+ public static class DistributedCacheFileContext extends InitializationContext {
+ // base directory under which file has been localized
+ Path localizedBaseDir;
+ // the unique string used to construct the localized path
+ String uniqueString;
+
+ public DistributedCacheFileContext(String user, File workDir,
+ Path localizedBaseDir, String uniqueString) {
+ super(user, workDir);
+ this.localizedBaseDir = localizedBaseDir;
+ this.uniqueString = uniqueString;
+ }
+
+ public Path getLocalizedUniqueDir() {
+ return new Path(localizedBaseDir, new Path(TaskTracker
+ .getPrivateDistributedCacheDir(user), uniqueString));
+ }
}
static class JobInitializationContext extends InitializationContext {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:43:57 2011
@@ -42,7 +42,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapreduce.JobContext;
@@ -169,11 +168,6 @@ abstract class TaskRunner extends Thread
// of files should happen in the TaskTracker's process space. Any changes to
// the conf object after this will NOT be reflected to the child.
setupChildTaskConfiguration(lDirAlloc);
-
- InitializationContext context = new InitializationContext();
- context.user = conf.getUser();
- context.workDir = new File(conf.get(TaskTracker.JOB_LOCAL_DIR));
- tracker.getTaskController().initializeDistributedCache(context);
if (!prepare()) {
return;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:43:57 2011
@@ -648,12 +648,17 @@ public class TaskTracker
this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
LOG.info("Starting tracker " + taskTrackerName);
- // Initialize DistributedCache and
- // clear out temporary files that might be lying around
- this.distributedCacheManager =
- new TrackerDistributedCacheManager(this.fConf);
- this.distributedCacheManager.purgeCache();
- cleanupStorage();
+ Class<? extends TaskController> taskControllerClass = fConf.getClass(
+ "mapred.task.tracker.task-controller", DefaultTaskController.class, TaskController.class);
+ taskController = (TaskController) ReflectionUtils.newInstance(
+ taskControllerClass, fConf);
+
+ // setup and create jobcache directory with appropriate permissions
+ taskController.setup();
+
+ // Initialize DistributedCache
+ this.distributedCacheManager = new TrackerDistributedCacheManager(
+ this.fConf, taskController);
this.jobClient = (InterTrackerProtocol)
RPC.waitForProxy(InterTrackerProtocol.class,
@@ -681,15 +686,6 @@ public class TaskTracker
reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
mapLauncher.start();
reduceLauncher.start();
- Class<? extends TaskController> taskControllerClass
- = fConf.getClass("mapred.task.tracker.task-controller",
- DefaultTaskController.class,
- TaskController.class);
- taskController = (TaskController)ReflectionUtils.newInstance(
- taskControllerClass, fConf);
-
- //setup and create jobcache directory with appropriate permissions
- taskController.setup();
// create a localizer instance
setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar 4 03:43:57 2011
@@ -34,10 +34,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
@@ -46,10 +44,12 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.security.UserGroupInformation;
-
+import org.apache.hadoop.util.ReflectionUtils;
public class TestTrackerDistributedCacheManager extends TestCase {
private static final Log LOG =
@@ -61,7 +61,6 @@ public class TestTrackerDistributedCache
.getAbsolutePath();
protected File ROOT_MAPRED_LOCAL_DIR;
- private static String TEST_CACHE_BASE_DIR = "cachebasedir";
protected int numLocalDirs = 6;
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
@@ -72,7 +71,8 @@ public class TestTrackerDistributedCache
private FileSystem fs;
protected LocalDirAllocator localDirAllocator =
- new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+ new LocalDirAllocator("mapred.local.dir");
+ protected TaskController taskController;
@Override
protected void setUp() throws IOException,InterruptedException {
@@ -87,12 +87,25 @@ public class TestTrackerDistributedCache
ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
ROOT_MAPRED_LOCAL_DIR.mkdirs();
+ String []localDirs = new String[numLocalDirs];
+ for (int i = 0; i < numLocalDirs; i++) {
+ File localDir = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i);
+ localDirs[i] = localDir.getPath();
+ localDir.mkdir();
+ }
+
conf = new Configuration();
- conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
- conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
- ROOT_MAPRED_LOCAL_DIR.toString());
+ conf.setStrings("mapred.local.dir", localDirs);
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
fs = FileSystem.get(conf);
+ Class<? extends TaskController> taskControllerClass = conf.getClass(
+ "mapred.task.tracker.task-controller", DefaultTaskController.class,
+ TaskController.class);
+ taskController = (TaskController) ReflectionUtils.newInstance(
+ taskControllerClass, conf);
+
+ // setup permissions for mapred local dir
+ taskController.setup();
// Create the temporary cache files to be used in the tests.
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
@@ -100,6 +113,11 @@ public class TestTrackerDistributedCache
createPrivateTempFile(firstCacheFile);
createPrivateTempFile(secondCacheFile);
}
+
+ protected void refreshConf(Configuration conf) throws IOException {
+ taskController.setConf(conf);
+ taskController.setup();
+ }
/**
* Whether the test can run on the machine
@@ -124,6 +142,8 @@ public class TestTrackerDistributedCache
// ****** Imitate JobClient code
// Configures a task/job with both a regular file and a "classpath" file.
Configuration subConf = new Configuration(conf);
+ String userName = getJobOwnerName();
+ subConf.set("user.name", userName);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
DistributedCache.addFileToClassPath(secondCacheFile, subConf);
TrackerDistributedCacheManager.determineTimestamps(subConf);
@@ -135,11 +155,9 @@ public class TestTrackerDistributedCache
subConf.writeXml(os);
os.close();
- String userName = getJobOwnerName();
-
// ****** Imitate TaskRunner code.
TrackerDistributedCacheManager manager =
- new TrackerDistributedCacheManager(conf);
+ new TrackerDistributedCacheManager(conf, taskController);
TaskDistributedCacheManager handle =
manager.newTaskDistributedCacheManager(subConf);
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
@@ -147,11 +165,6 @@ public class TestTrackerDistributedCache
handle.setup(localDirAllocator, workDir, TaskTracker
.getPrivateDistributedCacheDir(userName),
TaskTracker.getPublicDistributedCacheDir());
-
- InitializationContext context = new InitializationContext();
- context.user = userName;
- context.workDir = workDir;
- getTaskController().initializeDistributedCache(context);
// ****** End of imitating TaskRunner code
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
@@ -181,18 +194,18 @@ public class TestTrackerDistributedCache
TrackerDistributedCacheManager {
public FakeTrackerDistributedCacheManager(Configuration conf)
throws IOException {
- super(conf);
+ super(conf, taskController);
}
@Override
Path localizeCache(Configuration conf, URI cache, long confFileStamp,
- CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
- throws IOException {
+ CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive,
+ boolean isPublic) throws IOException {
if (cache.equals(firstCacheFile.toUri())) {
throw new IOException("fake fail");
}
return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
- fileStatus, isArchive);
+ fileStatus, isArchive, isPublic);
}
}
@@ -201,8 +214,6 @@ public class TestTrackerDistributedCache
if (!canRun()) {
return;
}
- Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
TrackerDistributedCacheManager manager =
new FakeTrackerDistributedCacheManager(conf);
@@ -212,6 +223,7 @@ public class TestTrackerDistributedCache
// Configures a job with a regular file
Job job1 = new Job(conf);
Configuration conf1 = job1.getConfiguration();
+ conf1.set("user.name", userName);
DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
TrackerDistributedCacheManager.determineTimestamps(conf1);
@@ -234,6 +246,7 @@ public class TestTrackerDistributedCache
// Configures another job with three regular files.
Job job2 = new Job(conf);
Configuration conf2 = job2.getConfiguration();
+ conf2.set("user.name", userName);
// add a file that would get failed to localize
DistributedCache.addCacheFile(firstCacheFile.toUri(), conf2);
// add a file that is already localized by different job
@@ -291,7 +304,7 @@ public class TestTrackerDistributedCache
private void checkLocalizedPath(String visibility)
throws IOException, LoginException, InterruptedException {
TrackerDistributedCacheManager manager =
- new TrackerDistributedCacheManager(conf);
+ new TrackerDistributedCacheManager(conf, taskController);
String userName = getJobOwnerName();
File workDir = new File(TEST_ROOT_DIR, "workdir");
Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
@@ -302,6 +315,7 @@ public class TestTrackerDistributedCache
}
Configuration conf1 = new Configuration(conf);
+ conf1.set("user.name", userName);
DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
TrackerDistributedCacheManager.determineTimestamps(conf1);
TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
@@ -322,12 +336,18 @@ public class TestTrackerDistributedCache
Path localizedPath =
manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
fs.getFileStatus(cacheFile), false,
- c.timestamp, new Path(TEST_ROOT_DIR), false);
+ c.timestamp, new Path(TEST_ROOT_DIR), false,
+ Boolean.parseBoolean(visibility));
assertTrue("Cache file didn't get localized in the expected directory. " +
"Expected localization to happen within " +
ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
", but was localized at " +
localizedPath, localizedPath.toString().contains(distCacheDir));
+ if ("true".equals(visibility)) {
+ checkPublicFilePermissions(new Path[]{localizedPath});
+ } else {
+ checkFilePermissions(new Path[]{localizedPath});
+ }
}
/**
@@ -338,17 +358,29 @@ public class TestTrackerDistributedCache
*/
protected void checkFilePermissions(Path[] localCacheFiles)
throws IOException {
- Path cachedFirstFile = localCacheFiles[0];
- Path cachedSecondFile = localCacheFiles[1];
- // Both the files should have executable permissions on them.
- assertTrue("First cache file is not executable!", new File(cachedFirstFile
- .toUri().getPath()).canExecute());
- assertTrue("Second cache file is not executable!", new File(
- cachedSecondFile.toUri().getPath()).canExecute());
+ // All the files should have executable permissions on them.
+ for (Path p : localCacheFiles) {
+ assertTrue("Cache file is not executable!", new File(p
+ .toUri().getPath()).canExecute());
+ }
}
- protected TaskController getTaskController() {
- return new DefaultTaskController();
+ /**
+ * Check permissions on the public cache files
+ *
+ * @param localCacheFiles
+ * @throws IOException
+ */
+ private void checkPublicFilePermissions(Path[] localCacheFiles)
+ throws IOException {
+ // All the files should have read and executable permissions for others
+ for (Path p : localCacheFiles) {
+ FsPermission perm = fs.getFileStatus(p).getPermission();
+ assertTrue("cache file is not readable by others", perm.getOtherAction()
+ .implies(FsAction.READ));
+ assertTrue("cache file is not executable by others", perm
+ .getOtherAction().implies(FsAction.EXECUTE));
+ }
}
protected String getJobOwnerName() throws LoginException {
@@ -361,27 +393,39 @@ public class TestTrackerDistributedCache
if (!canRun()) {
return;
}
+ // This test needs mapred.local.dir to be single directory
+ // instead of four, because it assumes that both
+ // firstcachefile and secondcachefile will be localized on same directory
+ // so that second localization triggers deleteCache.
+ // If mapred.local.dir is four directories, second localization might not
+ // trigger deleteCache, if it is localized in different directory.
+ Configuration conf2 = new Configuration(conf);
+ conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
+ conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+ refreshConf(conf2);
TrackerDistributedCacheManager manager =
- new TrackerDistributedCacheManager(conf);
- FileSystem localfs = FileSystem.getLocal(conf);
+ new TrackerDistributedCacheManager(conf2, taskController);
+ FileSystem localfs = FileSystem.getLocal(conf2);
long now = System.currentTimeMillis();
+ String userName = getJobOwnerName();
+ conf2.set("user.name", userName);
- manager.getLocalCache(firstCacheFile.toUri(), conf,
- TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
- now, new Path(TEST_ROOT_DIR), false);
- manager.releaseCache(firstCacheFile.toUri(), conf, now);
+ Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(firstCacheFile), false,
+ now, new Path(TEST_ROOT_DIR), false, false);
+ manager.releaseCache(firstCacheFile.toUri(), conf2, now);
//in above code,localized a file of size 4K and then release the cache
// which will cause the cache be deleted when the limit goes out.
// The below code localize another cache which's designed to
//sweep away the first cache.
- manager.getLocalCache(secondCacheFile.toUri(), conf,
- TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false,
- System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
- FileStatus[] dirStatuses = localfs.listStatus(
- new Path(ROOT_MAPRED_LOCAL_DIR.toString()));
- assertTrue("DistributedCache failed deleting old" +
+ manager.getLocalCache(secondCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(secondCacheFile), false,
+ System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ assertFalse("DistributedCache failed deleting old" +
" cache when the cache store is full.",
- dirStatuses.length == 1);
+ localfs.exists(localCache));
}
public void testFileSystemOtherThanDefault() throws Exception {
@@ -389,14 +433,17 @@ public class TestTrackerDistributedCache
return;
}
TrackerDistributedCacheManager manager =
- new TrackerDistributedCacheManager(conf);
+ new TrackerDistributedCacheManager(conf, taskController);
conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+ String userName = getJobOwnerName();
+ conf.set("user.name", userName);
Path fileToCache = new Path("fakefile:///"
+ firstCacheFile.toUri().getPath());
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
- TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(firstCacheFile), false,
System.currentTimeMillis(),
- new Path(TEST_ROOT_DIR), false);
+ new Path(TEST_ROOT_DIR), false, false);
assertNotNull("DistributedCache cached file on non-default filesystem.",
result);
}
@@ -464,18 +511,19 @@ public class TestTrackerDistributedCache
Configuration myConf = new Configuration(conf);
myConf.set("fs.default.name", "refresh:///");
myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
+ String userName = getJobOwnerName();
+
TrackerDistributedCacheManager manager =
- new TrackerDistributedCacheManager(myConf);
+ new TrackerDistributedCacheManager(myConf, taskController);
// ****** Imitate JobClient code
// Configures a task/job with both a regular file and a "classpath" file.
Configuration subConf = new Configuration(myConf);
+ subConf.set("user.name", userName);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
TrackerDistributedCacheManager.determineTimestamps(subConf);
TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
// ****** End of imitating JobClient code
- String userName = getJobOwnerName();
-
// ****** Imitate TaskRunner code.
TaskDistributedCacheManager handle =
manager.newTaskDistributedCacheManager(subConf);
@@ -516,6 +564,7 @@ public class TestTrackerDistributedCache
// submit another job
Configuration subConf2 = new Configuration(myConf);
+ subConf2.set("user.name", userName);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
TrackerDistributedCacheManager.determineTimestamps(subConf2);
TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
@@ -539,4 +588,46 @@ public class TestTrackerDistributedCache
handle.release();
}
+ /**
+ * Localize a file. After localization is complete, create a file, "myFile",
+ * under the directory where the file is localized and ensure that it has
+ * permissions different from what is set by default. Then, localize another
+ * file. Verify that "myFile" has the right permissions.
+ * @throws Exception
+ */
+ public void testCustomPermissions() throws Exception {
+ if (!canRun()) {
+ return;
+ }
+ String userName = getJobOwnerName();
+ conf.set("user.name", userName);
+ TrackerDistributedCacheManager manager =
+ new TrackerDistributedCacheManager(conf, taskController);
+ FileSystem localfs = FileSystem.getLocal(conf);
+ long now = System.currentTimeMillis();
+
+ Path[] localCache = new Path[2];
+ localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(firstCacheFile), false,
+ now, new Path(TEST_ROOT_DIR), false, false);
+ FsPermission myPermission = new FsPermission((short)0600);
+ Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
+ if (FileSystem.create(localfs, myFile, myPermission) == null) {
+ throw new IOException("Could not create " + myFile);
+ }
+ try {
+ localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(secondCacheFile), false,
+ System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ FileStatus stat = localfs.getFileStatus(myFile);
+ assertTrue(stat.getPermission().equals(myPermission));
+ // validate permissions of localized files.
+ checkFilePermissions(localCache);
+ } finally {
+ localfs.delete(myFile, false);
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1077129&r1=1077128&r2=1077129&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Fri Mar 4 03:43:57 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
@@ -36,7 +37,6 @@ public class TestTrackerDistributedCache
TestTrackerDistributedCacheManager {
private File configFile;
- private MyLinuxTaskController taskController;
private String taskTrackerSpecialGroup;
private static final Log LOG =
@@ -65,7 +65,7 @@ public class TestTrackerDistributedCache
ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
String execPath = path + "/task-controller";
- taskController.setTaskControllerExe(execPath);
+ ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
taskController.setConf(conf);
taskController.setup();
@@ -74,6 +74,17 @@ public class TestTrackerDistributedCache
}
@Override
+ protected void refreshConf(Configuration conf) throws IOException {
+ super.refreshConf(conf);
+ String path =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+ configFile =
+ ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
+ .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+
+ }
+
+ @Override
protected void tearDown()
throws IOException {
if (!ClusterWithLinuxTaskController.shouldRun()) {
@@ -99,27 +110,19 @@ public class TestTrackerDistributedCache
}
@Override
- protected TaskController getTaskController() {
- return taskController;
- }
-
- @Override
protected void checkFilePermissions(Path[] localCacheFiles)
throws IOException {
- String cachedFirstFile = localCacheFiles[0].toUri().getPath();
- String cachedSecondFile = localCacheFiles[1].toUri().getPath();
String userName = getJobOwnerName();
- // First make sure that the cache files have proper permissions.
- TestTaskTrackerLocalization.checkFilePermissions(cachedFirstFile,
- "-r-xrwx---", userName, taskTrackerSpecialGroup);
- TestTaskTrackerLocalization.checkFilePermissions(cachedSecondFile,
- "-r-xrwx---", userName, taskTrackerSpecialGroup);
-
- // Now. make sure that all the path components also have proper
- // permissions.
- checkPermissionOnPathComponents(cachedFirstFile, userName);
- checkPermissionOnPathComponents(cachedSecondFile, userName);
+ for (Path p : localCacheFiles) {
+ // First make sure that the cache file has proper permissions.
+ TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(),
+ "-r-xrwx---", userName, taskTrackerSpecialGroup);
+ // Now. make sure that all the path components also have proper
+ // permissions.
+ checkPermissionOnPathComponents(p.toUri().getPath(), userName);
+ }
+
}
/**
@@ -136,7 +139,7 @@ public class TestTrackerDistributedCache
+ Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
+ Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName),
"");
- LOG.info("Leading path for cacheFirstFile is : "
+ LOG.info("Trailing path for cacheFirstFile is : "
+ trailingStringForFirstFile);
// The leading mapred.local.dir/0_[0-n]/taskTracker/$user string.
String leadingStringForFirstFile =