You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:43:31 UTC
svn commit: r1077125 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/filecache/ mapred/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapreduce/
mapred/org/apache/hadoop/mapreduce/server/tasktracker/ test/org...
Author: omalley
Date: Fri Mar 4 03:43:31 2011
New Revision: 1077125
URL: http://svn.apache.org/viewvc?rev=1077125&view=rev
Log:
commit c870693c4535d2aa90de203ac0514c6cd213f2d9
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Mon Jan 25 20:36:44 2010 +0530
MAPREDUCE:744 from https://issues.apache.org/jira/secure/attachment/12431313/744-6-y20.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-744. Introduces the notion of a public distributed cache.
+ (ddas)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java Fri Mar 4 03:43:31 2011
@@ -67,6 +67,7 @@ public class TaskDistributedCacheManager
REGULAR,
ARCHIVE
}
+ boolean isPublic = true;
/** Whether to decompress */
final FileType type;
final long timestamp;
@@ -74,10 +75,11 @@ public class TaskDistributedCacheManager
final boolean shouldBeAddedToClassPath;
boolean localized = false;
- private CacheFile(URI uri, FileType type, long timestamp,
+ private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp,
boolean classPath) {
this.uri = uri;
this.type = type;
+ this.isPublic = isPublic;
this.timestamp = timestamp;
this.shouldBeAddedToClassPath = classPath;
}
@@ -88,7 +90,7 @@ public class TaskDistributedCacheManager
* files.
*/
private static List<CacheFile> makeCacheFiles(URI[] uris,
- String[] timestamps, Path[] paths, FileType type) {
+ String[] timestamps, String cacheVisibilities[], Path[] paths, FileType type) {
List<CacheFile> ret = new ArrayList<CacheFile>();
if (uris != null) {
if (uris.length != timestamps.length) {
@@ -104,7 +106,8 @@ public class TaskDistributedCacheManager
URI u = uris[i];
boolean isClassPath = (null != classPaths.get(u.getPath()));
long t = Long.parseLong(timestamps[i]);
- ret.add(new CacheFile(u, type, t, isClassPath));
+ ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
+ t, isClassPath));
}
}
return ret;
@@ -128,11 +131,13 @@ public class TaskDistributedCacheManager
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
DistributedCache.getFileTimestamps(taskConf),
+ TrackerDistributedCacheManager.getFileVisibilities(taskConf),
DistributedCache.getFileClassPaths(taskConf),
CacheFile.FileType.REGULAR));
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
DistributedCache.getArchiveTimestamps(taskConf),
+ TrackerDistributedCacheManager.getArchiveVisibilities(taskConf),
DistributedCache.getArchiveClassPaths(taskConf),
CacheFile.FileType.ARCHIVE));
}
@@ -145,7 +150,7 @@ public class TaskDistributedCacheManager
* file, if necessary.
*/
public void setup(LocalDirAllocator lDirAlloc, File workDir,
- String cacheSubdir) throws IOException {
+ String privateCacheSubdir, String publicCacheSubDir) throws IOException {
setupCalled = true;
if (cacheFiles.isEmpty()) {
@@ -160,7 +165,10 @@ public class TaskDistributedCacheManager
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-
+ String cacheSubdir = publicCacheSubDir;
+ if (!cacheFile.isPublic) {
+ cacheSubdir = privateCacheSubdir;
+ }
Path p = distributedCacheManager.getLocalCache(uri, taskConf,
cacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar 4 03:43:31 2011
@@ -36,6 +36,9 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.RunJar;
/**
@@ -299,6 +302,51 @@ public class TrackerDistributedCacheMana
cache.toString(), cacheStatus.localLoadPath));
return cacheStatus.localLoadPath;
}
+
+ /**
+ * Returns a boolean to denote whether a cache file is visible to all(public)
+ * or not
+ * @param conf
+ * @param uri
+ * @return true if the path in the uri is visible to all, false otherwise
+ * @throws IOException
+ */
+ static boolean isPublic(Configuration conf, URI uri) throws IOException {
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path current = new Path(uri.getPath());
+ //the leaf level file should be readable by others
+ if (!checkPermissionOfOther(fs, current, FsAction.READ)) {
+ return false;
+ }
+ current = current.getParent();
+ while (current != null) {
+ //the subdirs in the path should have execute permissions for others
+ if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+ return false;
+ }
+ current = current.getParent();
+ }
+ return true;
+ }
+ /**
+ * Checks for a given path whether the Other permissions on it
+ * imply the permission in the passed FsAction
+ * @param fs
+ * @param path
+ * @param action
+ * @return true if the path in the uri is visible to all, false otherwise
+ * @throws IOException
+ */
+ private static boolean checkPermissionOfOther(FileSystem fs, Path path,
+ FsAction action) throws IOException {
+ FileStatus status = fs.getFileStatus(path);
+ FsPermission perms = status.getPermission();
+ FsAction otherAction = perms.getOtherAction();
+ if (otherAction.implies(action)) {
+ return true;
+ }
+ return false;
+ }
private void createSymlink(Configuration conf, URI cache,
CacheStatus cacheStatus, boolean isArchive,
@@ -524,6 +572,7 @@ public class TrackerDistributedCacheMana
return new TaskDistributedCacheManager(this, taskConf);
}
+
/**
* Determines timestamps of files to be cached, and stores those
* in the configuration. This is intended to be used internally by JobClient
@@ -560,4 +609,79 @@ public class TrackerDistributedCacheMana
DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
}
}
+ /**
+ * Determines the visibilities of the distributed cache files and
+ * archives. The visibility of a cache path is "public" if the leaf component
+ * has READ permissions for others, and the parent subdirs have
+ * EXECUTE permissions for others
+ * @param job
+ * @throws IOException
+ */
+ public static void determineCacheVisibilities(Configuration job)
+ throws IOException {
+ URI[] tarchives = DistributedCache.getCacheArchives(job);
+ if (tarchives != null) {
+ StringBuffer archiveVisibilities =
+ new StringBuffer(String.valueOf(isPublic(job, tarchives[0])));
+ for (int i = 1; i < tarchives.length; i++) {
+ archiveVisibilities.append(",");
+ archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i])));
+ }
+ setArchiveVisibilities(job, archiveVisibilities.toString());
+ }
+ URI[] tfiles = DistributedCache.getCacheFiles(job);
+ if (tfiles != null) {
+ StringBuffer fileVisibilities =
+ new StringBuffer(String.valueOf(isPublic(job, tfiles[0])));
+ for (int i = 1; i < tfiles.length; i++) {
+ fileVisibilities.append(",");
+ fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i])));
+ }
+ setFileVisibilities(job, fileVisibilities.toString());
+ }
+ }
+
+ /**
+ * Get the booleans on whether the files are public or not. Used by
+ * internal DistributedCache and MapReduce code.
+ * @param conf The configuration which stored the timestamps
+ * @return a string array of booleans
+ * @throws IOException
+ */
+ static String[] getFileVisibilities(Configuration conf) {
+ return conf.getStrings(JobContext.CACHE_FILE_VISIBILITIES);
+ }
+
+ /**
+ * Get the booleans on whether the archives are public or not. Used by
+ * internal DistributedCache and MapReduce code.
+ * @param conf The configuration which stored the timestamps
+ * @return a string array of booleans
+ */
+ static String[] getArchiveVisibilities(Configuration conf) {
+ return conf.getStrings(JobContext.CACHE_ARCHIVES_VISIBILITIES);
+ }
+
+ /**
+ * This is to check the public/private visibility of the archives to be
+ * localized.
+ *
+ * @param conf Configuration which stores the timestamp's
+ * @param booleans comma separated list of booleans (true - public)
+ * The order should be the same as the order in which the archives are added.
+ */
+ static void setArchiveVisibilities(Configuration conf, String booleans) {
+ conf.set(JobContext.CACHE_ARCHIVES_VISIBILITIES, booleans);
+ }
+
+ /**
+ * This is to check the public/private visibility of the files to be localized
+ *
+ * @param conf Configuration which stores the timestamp's
+ * @param booleans comma separated list of booleans (true - public)
+ * The order should be the same as the order in which the files are added.
+ */
+ static void setFileVisibilities(Configuration conf, String booleans) {
+ conf.set(JobContext.CACHE_FILE_VISIBILITIES, booleans);
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar 4 03:43:31 2011
@@ -641,7 +641,9 @@ public class JobClient extends Configure
// set the timestamps of the archives and files
TrackerDistributedCacheManager.determineTimestamps(job);
-
+ // set the public/private visibility of the archives and files
+ TrackerDistributedCacheManager.determineCacheVisibilities(job);
+
String originalJarPath = job.getJar();
if (originalJarPath != null) { // copy jar to JobTracker's fs
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 4 03:43:31 2011
@@ -118,8 +118,8 @@ class LocalJobRunner implements JobSubmi
taskDistributedCacheManager.setup(
new LocalDirAllocator("mapred.local.dir"),
new File(systemJobDir.toString()),
- "archive");
-
+ "archive", "archive");
+
if (DistributedCache.getSymlink(conf)) {
// This is not supported largely because,
// for a Child subprocess, the cwd in LocalJobRunner
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:43:31 2011
@@ -160,9 +160,10 @@ abstract class TaskRunner extends Thread
// We don't create any symlinks yet, so presence/absence of workDir
// actually on the file system doesn't matter.
taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
- .newTaskDistributedCacheManager(conf);
- taskDistributedCacheManager.setup(lDirAlloc, workDir,
- TaskTracker.getDistributedCacheDir(conf.getUser()));
+ .newTaskDistributedCacheManager(conf);
+ taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
+ .getPrivateDistributedCacheDir(conf.getUser()),
+ TaskTracker.getPublicDistributedCacheDir());
// Set up the child task's configuration. After this call, no localization
// of files should happen in the TaskTracker's process space. Any changes to
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:43:31 2011
@@ -454,9 +454,13 @@ public class TaskTracker
localizer = l;
}
- public static String getDistributedCacheDir(String user) {
+ public static String getPrivateDistributedCacheDir(String user) {
return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
}
+
+ public static String getPublicDistributedCacheDir() {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+ }
public static String getJobCacheSubdir(String user) {
return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Fri Mar 4 03:43:31 2011
@@ -49,6 +49,11 @@ public class JobContext {
private final JobID jobId;
public static final String JOB_TOKEN_FILE = "mapreduce.job.jobTokenFile";
+
+ public static final String CACHE_FILE_VISIBILITIES =
+ "mapreduce.job.cache.files.visibilities";
+ public static final String CACHE_ARCHIVES_VISIBILITIES =
+ "mapreduce.job.cache.archives.visibilities";
public JobContext(Configuration conf, JobID jobId) {
this.conf = new org.apache.hadoop.mapred.JobConf(conf);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar 4 03:43:31 2011
@@ -233,7 +233,7 @@ public class Localizer {
// Set up the cache directory used for distributed cache files
File distributedCacheDir =
- new File(localDir, TaskTracker.getDistributedCacheDir(user));
+ new File(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
// Set permissions on the distcache-directory
PermissionsHandler.setPermissions(distributedCacheDir,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar 4 03:43:31 2011
@@ -75,7 +75,7 @@ public class TestTrackerDistributedCache
new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
@Override
- protected void setUp() throws IOException {
+ protected void setUp() throws IOException,InterruptedException {
// Prepare the tests' root dir
File TEST_ROOT = new File(TEST_ROOT_DIR);
@@ -97,8 +97,8 @@ public class TestTrackerDistributedCache
// Create the temporary cache files to be used in the tests.
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
- createTempFile(firstCacheFile);
- createTempFile(secondCacheFile);
+ createPrivateTempFile(firstCacheFile);
+ createPrivateTempFile(secondCacheFile);
}
/**
@@ -127,6 +127,7 @@ public class TestTrackerDistributedCache
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
DistributedCache.addFileToClassPath(secondCacheFile, subConf);
TrackerDistributedCacheManager.determineTimestamps(subConf);
+ TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
// ****** End of imitating JobClient code
Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
@@ -144,7 +145,8 @@ public class TestTrackerDistributedCache
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
handle.setup(localDirAllocator, workDir, TaskTracker
- .getDistributedCacheDir(userName));
+ .getPrivateDistributedCacheDir(userName),
+ TaskTracker.getPublicDistributedCacheDir());
InitializationContext context = new InitializationContext();
context.user = userName;
@@ -195,7 +197,7 @@ public class TestTrackerDistributedCache
}
public void testReferenceCount() throws IOException, LoginException,
- URISyntaxException {
+ URISyntaxException, InterruptedException {
if (!canRun()) {
return;
}
@@ -213,19 +215,21 @@ public class TestTrackerDistributedCache
DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
TrackerDistributedCacheManager.determineTimestamps(conf1);
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
// Task localizing for first job
TaskDistributedCacheManager handle = manager
.newTaskDistributedCacheManager(conf1);
handle.setup(localDirAllocator, workDir, TaskTracker
- .getDistributedCacheDir(userName));
+ .getPrivateDistributedCacheDir(userName),
+ TaskTracker.getPublicDistributedCacheDir());
handle.release();
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
}
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
- createTempFile(thirdCacheFile);
+ createPrivateTempFile(thirdCacheFile);
// Configures another job with three regular files.
Job job2 = new Job(conf);
@@ -238,6 +242,7 @@ public class TestTrackerDistributedCache
DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
TrackerDistributedCacheManager.determineTimestamps(conf2);
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
// Task localizing for second job
// localization for the "firstCacheFile" will fail.
@@ -245,7 +250,8 @@ public class TestTrackerDistributedCache
Throwable th = null;
try {
handle.setup(localDirAllocator, workDir, TaskTracker
- .getDistributedCacheDir(userName));
+ .getPrivateDistributedCacheDir(userName),
+ TaskTracker.getPublicDistributedCacheDir());
} catch (IOException e) {
th = e;
LOG.info("Exception during setup", e);
@@ -266,7 +272,64 @@ public class TestTrackerDistributedCache
assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
fs.delete(thirdCacheFile, false);
}
+
+ /**
+ * Tests that localization of distributed cache file happens in the desired
+ * directory
+ * @throws IOException
+ * @throws LoginException
+ */
+ public void testPublicPrivateCache()
+ throws IOException, LoginException, InterruptedException {
+ if (!canRun()) {
+ return;
+ }
+ checkLocalizedPath("true");
+ checkLocalizedPath("false");
+ }
+
+ private void checkLocalizedPath(String visibility)
+ throws IOException, LoginException, InterruptedException {
+ TrackerDistributedCacheManager manager =
+ new TrackerDistributedCacheManager(conf);
+ String userName = getJobOwnerName();
+ File workDir = new File(TEST_ROOT_DIR, "workdir");
+ Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+ if ("true".equals(visibility)) {
+ createPublicTempFile(cacheFile);
+ } else {
+ createPrivateTempFile(cacheFile);
+ }
+
+ Configuration conf1 = new Configuration(conf);
+ DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
+ TrackerDistributedCacheManager.determineTimestamps(conf1);
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
+ // Task localizing for job
+ TaskDistributedCacheManager handle = manager
+ .newTaskDistributedCacheManager(conf1);
+ handle.setup(localDirAllocator, workDir, TaskTracker
+ .getPrivateDistributedCacheDir(userName),
+ TaskTracker.getPublicDistributedCacheDir());
+ TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
+ String distCacheDir;
+ if ("true".equals(visibility)) {
+ distCacheDir = TaskTracker.getPublicDistributedCacheDir();
+ } else {
+ distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName);
+ }
+ Path localizedPath =
+ manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
+ fs.getFileStatus(cacheFile), false,
+ c.timestamp, new Path(TEST_ROOT_DIR), false);
+ assertTrue("Cache file didn't get localized in the expected directory. " +
+ "Expected localization to happen within " +
+ ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
+ ", but was localized at " +
+ localizedPath, localizedPath.toString().contains(distCacheDir));
+ }
+
/**
* Check proper permissions on the cache files
*
@@ -347,6 +410,18 @@ public class TestTrackerDistributedCache
os.close();
FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
}
+
+ static void createPublicTempFile(Path p)
+ throws IOException, InterruptedException {
+ createTempFile(p);
+ FileUtil.chmod(p.toString(), "0777",true);
+ }
+
+ static void createPrivateTempFile(Path p)
+ throws IOException, InterruptedException {
+ createTempFile(p);
+ FileUtil.chmod(p.toString(), "0770",true);
+ }
@Override
protected void tearDown() throws IOException {
@@ -396,6 +471,7 @@ public class TestTrackerDistributedCache
Configuration subConf = new Configuration(myConf);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
TrackerDistributedCacheManager.determineTimestamps(subConf);
+ TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
// ****** End of imitating JobClient code
String userName = getJobOwnerName();
@@ -406,7 +482,8 @@ public class TestTrackerDistributedCache
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
handle.setup(localDirAllocator, workDir, TaskTracker
- .getDistributedCacheDir(userName));
+ .getPrivateDistributedCacheDir(userName),
+ TaskTracker.getPublicDistributedCacheDir());
// ****** End of imitating TaskRunner code
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
@@ -427,7 +504,7 @@ public class TestTrackerDistributedCache
Throwable th = null;
try {
handle.setup(localDirAllocator, workDir, TaskTracker
- .getDistributedCacheDir(userName));
+ .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
} catch (IOException ie) {
th = ie;
}
@@ -441,11 +518,12 @@ public class TestTrackerDistributedCache
Configuration subConf2 = new Configuration(myConf);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
TrackerDistributedCacheManager.determineTimestamps(subConf2);
+ TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
handle =
manager.newTaskDistributedCacheManager(subConf2);
handle.setup(localDirAllocator, workDir, TaskTracker
- .getDistributedCacheDir(userName));
+ .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
assertNotNull(null, localCacheFiles2);
assertEquals(1, localCacheFiles2.length);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar 4 03:43:31 2011
@@ -142,7 +142,7 @@ public class TestLocalizationWithLinuxTa
// Verify the distributed cache dir.
File distributedCacheDir =
new File(localDir, TaskTracker
- .getDistributedCacheDir(task.getUser()));
+ .getPrivateDistributedCacheDir(task.getUser()));
assertTrue("distributed cache dir " + distributedCacheDir
+ " doesn't exists!", distributedCacheDir.exists());
checkFilePermissions(distributedCacheDir.getAbsolutePath(),
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 03:43:31 2011
@@ -312,7 +312,7 @@ public class TestTaskTrackerLocalization
// Verify the distributed cache dir.
File distributedCacheDir =
new File(localDir, TaskTracker
- .getDistributedCacheDir(task.getUser()));
+ .getPrivateDistributedCacheDir(task.getUser()));
assertTrue("distributed cache dir " + distributedCacheDir
+ " doesn't exists!", distributedCacheDir.exists());
checkFilePermissions(distributedCacheDir.getAbsolutePath(),
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1077125&r1=1077124&r2=1077125&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Fri Mar 4 03:43:31 2011
@@ -45,7 +45,7 @@ public class TestTrackerDistributedCache
@Override
protected void setUp()
- throws IOException {
+ throws IOException, InterruptedException {
if (!ClusterWithLinuxTaskController.shouldRun()) {
return;
@@ -134,7 +134,7 @@ public class TestTrackerDistributedCache
String trailingStringForFirstFile =
cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath()
+ Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
- + Path.SEPARATOR + TaskTracker.getDistributedCacheDir(userName),
+ + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName),
"");
LOG.info("Leading path for cacheFirstFile is : "
+ trailingStringForFirstFile);