You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/08/02 20:18:02 UTC
svn commit: r981644 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapreduce/filecache/
src/test/mapred/org/apache/hadoop/mapreduce/filecache/
Author: ddas
Date: Mon Aug 2 18:18:01 2010
New Revision: 981644
URL: http://svn.apache.org/viewvc?rev=981644&view=rev
Log:
MAPREDUCE-1288. Fixes TrackerDistributedCacheManager to take into account the owner of the localized file in the mapping from cache URIs to CacheStatus objects. Contributed by Devaraj Das.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug 2 18:18:01 2010
@@ -212,6 +212,10 @@ Trunk (unreleased changes)
MAPREDUCE-1686. Fixes StreamUtil.goodClassOrNull to find classes without
package names. (Paul Burkhardt via amareshwari)
+ MAPREDUCE-1288. Fixes TrackerDistributedCacheManager to take into account
+ the owner of the localized file in the mapping from cache URIs to
+ CacheStatus objects. (ddas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Mon Aug 2 18:18:01 2010
@@ -283,7 +283,8 @@ public class DistributedCache {
throw new IOException("TimeStamp of the uri couldnot be found");
}
new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .releaseCache(cache, conf, Long.parseLong(timestamp));
+ .releaseCache(cache, conf, Long.parseLong(timestamp),
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
}
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Mon Aug 2 18:18:01 2010
@@ -73,14 +73,18 @@ public class TaskDistributedCacheManager
/** Whether this is to be added to the classpath */
final boolean shouldBeAddedToClassPath;
boolean localized = false;
+ /** The owner of the localized file. Relevant only on the tasktrackers */
+ final String owner;
private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp,
- boolean classPath) {
+ boolean classPath) throws IOException {
this.uri = uri;
this.type = type;
this.isPublic = isPublic;
this.timestamp = timestamp;
this.shouldBeAddedToClassPath = classPath;
+ this.owner =
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(isPublic);
}
/**
@@ -89,7 +93,8 @@ public class TaskDistributedCacheManager
* files.
*/
private static List<CacheFile> makeCacheFiles(URI[] uris,
- String[] timestamps, String cacheVisibilities[], Path[] paths, FileType type) {
+ String[] timestamps, String cacheVisibilities[], Path[] paths,
+ FileType type) throws IOException {
List<CacheFile> ret = new ArrayList<CacheFile>();
if (uris != null) {
if (uris.length != timestamps.length) {
@@ -235,7 +240,8 @@ public class TaskDistributedCacheManager
public void release() throws IOException {
for (CacheFile c : cacheFiles) {
if (c.getLocalized()) {
- distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
+ distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp,
+ c.owner);
}
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Mon Aug 2 18:18:01 2010
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -149,7 +150,8 @@ public class TrackerDistributedCacheMana
boolean isArchive, long confFileStamp,
Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
throws IOException {
- String key = getKey(cache, conf, confFileStamp);
+ String key;
+ key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic));
CacheStatus lcacheStatus;
Path localizedPath = null;
synchronized (cachedArchives) {
@@ -204,12 +206,14 @@ public class TrackerDistributedCacheMana
* using the cache, you need to release the cache
* @param cache The cache URI to be released
* @param conf configuration which contains the filesystem the cache
+ * @param timeStamp the timestamp on the file represented by the cache URI
+ * @param owner the owner of the localized file
* is contained in.
* @throws IOException
*/
- void releaseCache(URI cache, Configuration conf, long timeStamp)
- throws IOException {
- String key = getKey(cache, conf, timeStamp);
+ void releaseCache(URI cache, Configuration conf, long timeStamp,
+ String owner) throws IOException {
+ String key = getKey(cache, conf, timeStamp, owner);
synchronized (cachedArchives) {
CacheStatus lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
@@ -226,9 +230,9 @@ public class TrackerDistributedCacheMana
/*
* This method is called from unit tests.
*/
- int getReferenceCount(URI cache, Configuration conf, long timeStamp)
- throws IOException {
- String key = getKey(cache, conf, timeStamp);
+ int getReferenceCount(URI cache, Configuration conf, long timeStamp,
+ String owner) throws IOException {
+ String key = getKey(cache, conf, timeStamp, owner);
synchronized (cachedArchives) {
CacheStatus lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
@@ -237,6 +241,24 @@ public class TrackerDistributedCacheMana
return lcacheStatus.refcount;
}
}
+
+ /**
+ * Get the user who should "own" the localized distributed cache file.
+ * If the cache is public, the tasktracker user is the owner. If private,
+ * the user that the task is running as, is the owner.
+ * @param isPublic
+ * @return the owner as a shortname string
+ * @throws IOException
+ */
+ static String getLocalizedCacheOwner(boolean isPublic) throws IOException {
+ String user;
+ if (isPublic) {
+ user = UserGroupInformation.getLoginUser().getShortUserName();
+ } else {
+ user = UserGroupInformation.getCurrentUser().getShortUserName();
+ }
+ return user;
+ }
/**
* Delete a local path with asyncDiskService if available,
@@ -288,9 +310,9 @@ public class TrackerDistributedCacheMana
return path;
}
- String getKey(URI cache, Configuration conf, long timeStamp)
+ String getKey(URI cache, Configuration conf, long timeStamp, String user)
throws IOException {
- return makeRelative(cache, conf) + String.valueOf(timeStamp);
+ return makeRelative(cache, conf) + String.valueOf(timeStamp) + user;
}
/**
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Mon Aug 2 18:18:01 2010
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@@ -239,7 +240,8 @@ public class TestTrackerDistributedCache
TaskTracker.getPublicDistributedCacheDir());
handle.release();
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
- assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
+ assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
+ c.owner));
}
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -276,7 +278,8 @@ public class TestTrackerDistributedCache
th = null;
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
try {
- assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp));
+ assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp,
+ c.owner));
} catch (IOException ie) {
th = ie;
Log.info("Exception getting reference count for " + c.uri, ie);
@@ -298,11 +301,51 @@ public class TestTrackerDistributedCache
if (!canRun()) {
return;
}
- checkLocalizedPath("true");
- checkLocalizedPath("false");
+ checkLocalizedPath(true);
+ checkLocalizedPath(false);
}
- private void checkLocalizedPath(String visibility)
+ public void testPrivateCacheForMultipleUsers()
+ throws IOException, LoginException, InterruptedException{
+ // Try to initialize the distributed cache for the same file on the
+ // HDFS, for two different users.
+ // First initialize as the user running the test, then as some other user.
+ // Although the same cache file is used in both, the localization
+ // should happen twice.
+
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ Path p = ugi.doAs(new PrivilegedExceptionAction<Path>() {
+ public Path run()
+ throws IOException, LoginException, InterruptedException {
+ return checkLocalizedPath(false);
+ }
+ });
+ String distCacheDir = TaskTracker.getPrivateDistributedCacheDir(
+ ugi.getShortUserName());
+ assertTrue("Cache file didn't get localized in the expected directory. " +
+ "Expected localization to happen within " +
+ ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
+ ", but was localized at " +
+ p, p.toString().contains(distCacheDir));
+
+ ugi = UserGroupInformation.createRemoteUser("fooUserInMachine");
+ p = ugi.doAs(new PrivilegedExceptionAction<Path>() {
+ public Path run()
+ throws IOException, LoginException, InterruptedException {
+ return checkLocalizedPath(false);
+ }
+ });
+ distCacheDir = TaskTracker.getPrivateDistributedCacheDir(
+ ugi.getShortUserName());
+ assertTrue("Cache file didn't get localized in the expected directory. " +
+ "Expected localization to happen within " +
+ ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
+ ", but was localized at " +
+ p, p.toString().contains(distCacheDir));
+
+ }
+
+ private Path checkLocalizedPath(boolean visibility)
throws IOException, LoginException, InterruptedException {
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
@@ -310,7 +353,7 @@ public class TestTrackerDistributedCache
String userName = getJobOwnerName();
File workDir = new File(TEST_ROOT_DIR, "workdir");
Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
- if ("true".equals(visibility)) {
+ if (visibility) {
createPublicTempFile(cacheFile);
} else {
createPrivateTempFile(cacheFile);
@@ -331,7 +374,7 @@ public class TestTrackerDistributedCache
TaskTracker.getPublicDistributedCacheDir());
TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
String distCacheDir;
- if ("true".equals(visibility)) {
+ if (visibility) {
distCacheDir = TaskTracker.getPublicDistributedCacheDir();
} else {
distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName);
@@ -340,17 +383,18 @@ public class TestTrackerDistributedCache
manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
fs.getFileStatus(cacheFile), false,
c.timestamp, new Path(TEST_ROOT_DIR), false,
- Boolean.parseBoolean(visibility));
+ visibility);
assertTrue("Cache file didn't get localized in the expected directory. " +
"Expected localization to happen within " +
ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
", but was localized at " +
localizedPath, localizedPath.toString().contains(distCacheDir));
- if ("true".equals(visibility)) {
+ if (visibility) {
checkPublicFilePermissions(new Path[]{localizedPath});
} else {
checkFilePermissions(new Path[]{localizedPath});
}
+ return localizedPath;
}
/**
@@ -424,7 +468,7 @@ public class TestTrackerDistributedCache
}
protected String getJobOwnerName() throws IOException {
- return UserGroupInformation.getLoginUser().getUserName();
+ return UserGroupInformation.getCurrentUser().getUserName();
}
private long getFileStamp(Path file) throws IOException {
@@ -463,7 +507,8 @@ public class TestTrackerDistributedCache
fs.getFileStatus(firstCacheFile), false,
getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
manager.releaseCache(firstCacheFile.toUri(), conf2,
- getFileStamp(firstCacheFile));
+ getFileStamp(firstCacheFile),
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
//in above code,localized a file of size 4K and then release the cache
// which will cause the cache be deleted when the limit goes out.
// The below code localize another cache which's designed to
@@ -488,7 +533,8 @@ public class TestTrackerDistributedCache
getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false);
// Release the third cache so that it can be deleted while sweeping
manager.releaseCache(thirdCacheFile.toUri(), conf2,
- getFileStamp(thirdCacheFile));
+ getFileStamp(thirdCacheFile),
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
// Getting the fourth cache will make the number of sub directories becomes
// 3 which is greater than 2. So the released cache will be deleted.
manager.getLocalCache(fourthCacheFile.toUri(), conf2,