You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/08/02 20:18:02 UTC

svn commit: r981644 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/mapred/org/apache/hadoop/mapreduce/filecache/

Author: ddas
Date: Mon Aug  2 18:18:01 2010
New Revision: 981644

URL: http://svn.apache.org/viewvc?rev=981644&view=rev
Log:
MAPREDUCE-1288. Fixes TrackerDistributedCacheManager to take into account the owner of the localized file in the mapping from cache URIs to CacheStatus objects. Contributed by Devaraj Das.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug  2 18:18:01 2010
@@ -212,6 +212,10 @@ Trunk (unreleased changes)
     MAPREDUCE-1686. Fixes StreamUtil.goodClassOrNull to find classes without
     package names. (Paul Burkhardt via amareshwari)
 
+    MAPREDUCE-1288. Fixes TrackerDistributedCacheManager to take into account
+    the owner of the localized file in the mapping from cache URIs to
+    CacheStatus objects. (ddas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Mon Aug  2 18:18:01 2010
@@ -283,7 +283,8 @@ public class DistributedCache {
       throw new IOException("TimeStamp of the uri couldnot be found");
     }
     new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .releaseCache(cache, conf, Long.parseLong(timestamp));
+           .releaseCache(cache, conf, Long.parseLong(timestamp),
+            TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Mon Aug  2 18:18:01 2010
@@ -73,14 +73,18 @@ public class TaskDistributedCacheManager
     /** Whether this is to be added to the classpath */
     final boolean shouldBeAddedToClassPath;
     boolean localized = false;
+    /** The owner of the localized file. Relevant only on the tasktrackers */
+    final String owner;
 
     private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
-        boolean classPath) {
+        boolean classPath) throws IOException {
       this.uri = uri;
       this.type = type;
       this.isPublic = isPublic;
       this.timestamp = timestamp;
       this.shouldBeAddedToClassPath = classPath;
+      this.owner =
+          TrackerDistributedCacheManager.getLocalizedCacheOwner(isPublic);
     }
 
     /**
@@ -89,7 +93,8 @@ public class TaskDistributedCacheManager
      * files.
      */
     private static List<CacheFile> makeCacheFiles(URI[] uris, 
-        String[] timestamps, String cacheVisibilities[], Path[] paths, FileType type) {
+        String[] timestamps, String cacheVisibilities[], Path[] paths,
+        FileType type) throws IOException {
       List<CacheFile> ret = new ArrayList<CacheFile>();
       if (uris != null) {
         if (uris.length != timestamps.length) {
@@ -235,7 +240,8 @@ public class TaskDistributedCacheManager
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
       if (c.getLocalized()) {
-        distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
+        distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp,
+            c.owner);
       }
     }
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Mon Aug  2 18:18:01 2010
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.classification.InterfaceAudience;
 
@@ -149,7 +150,8 @@ public class TrackerDistributedCacheMana
       boolean isArchive, long confFileStamp,
       Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
       throws IOException {
-    String key = getKey(cache, conf, confFileStamp);
+    String key;
+    key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic));
     CacheStatus lcacheStatus;
     Path localizedPath = null;
     synchronized (cachedArchives) {
@@ -204,12 +206,14 @@ public class TrackerDistributedCacheMana
    * using the cache, you need to release the cache
    * @param cache The cache URI to be released
    * @param conf configuration which contains the filesystem the cache
+   * @param timeStamp the timestamp on the file represented by the cache URI
+   * @param owner the owner of the localized file
    * is contained in.
    * @throws IOException
    */
-  void releaseCache(URI cache, Configuration conf, long timeStamp)
-    throws IOException {
-    String key = getKey(cache, conf, timeStamp);
+  void releaseCache(URI cache, Configuration conf, long timeStamp,
+      String owner) throws IOException {
+    String key = getKey(cache, conf, timeStamp, owner);
     synchronized (cachedArchives) {
       CacheStatus lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -226,9 +230,9 @@ public class TrackerDistributedCacheMana
   /*
    * This method is called from unit tests. 
    */
-  int getReferenceCount(URI cache, Configuration conf, long timeStamp) 
-    throws IOException {
-    String key = getKey(cache, conf, timeStamp);
+  int getReferenceCount(URI cache, Configuration conf, long timeStamp,
+      String owner) throws IOException {
+    String key = getKey(cache, conf, timeStamp, owner);
     synchronized (cachedArchives) {
       CacheStatus lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -237,6 +241,24 @@ public class TrackerDistributedCacheMana
       return lcacheStatus.refcount;
     }
   }
+  
+  /**
+   * Get the user who should "own" the localized distributed cache file.
+   * If the cache is public, the tasktracker user is the owner. If private,
+   * the user that the task is running as, is the owner.
+   * @param isPublic
+   * @return the owner as a shortname string
+   * @throws IOException
+   */
+  static String getLocalizedCacheOwner(boolean isPublic) throws IOException {
+    String user;
+    if (isPublic) {
+      user = UserGroupInformation.getLoginUser().getShortUserName();
+    } else {
+      user = UserGroupInformation.getCurrentUser().getShortUserName();
+    }
+    return user;
+  }
 
   /**
    * Delete a local path with asyncDiskService if available,
@@ -288,9 +310,9 @@ public class TrackerDistributedCacheMana
     return path;
   }
 
-  String getKey(URI cache, Configuration conf, long timeStamp) 
+  String getKey(URI cache, Configuration conf, long timeStamp, String user)
       throws IOException {
-    return makeRelative(cache, conf) + String.valueOf(timeStamp);
+    return makeRelative(cache, conf) + String.valueOf(timeStamp) + user;
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=981644&r1=981643&r2=981644&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Mon Aug  2 18:18:01 2010
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
@@ -239,7 +240,8 @@ public class TestTrackerDistributedCache
           TaskTracker.getPublicDistributedCacheDir());
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
-      assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
+      assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
+          c.owner));
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -276,7 +278,8 @@ public class TestTrackerDistributedCache
     th = null;
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       try {
-        assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp));
+        assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp,
+            c.owner));
       } catch (IOException ie) {
         th = ie;
         Log.info("Exception getting reference count for " + c.uri, ie);
@@ -298,11 +301,51 @@ public class TestTrackerDistributedCache
     if (!canRun()) {
       return;
     }
-    checkLocalizedPath("true");
-    checkLocalizedPath("false");
+    checkLocalizedPath(true);
+    checkLocalizedPath(false);
   }
   
-  private void checkLocalizedPath(String visibility) 
+  public void testPrivateCacheForMultipleUsers() 
+  throws IOException, LoginException, InterruptedException{
+    // Try to initialize the distributed cache for the same file on the
+    // HDFS, for two different users.
+    // First initialize as the user running the test, then as some other user.
+    // Although the same cache file is used in both, the localization
+    // should happen twice.
+    
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    Path p = ugi.doAs(new PrivilegedExceptionAction<Path>() {
+      public Path run() 
+      throws IOException, LoginException, InterruptedException {
+        return checkLocalizedPath(false);
+      }
+    });
+    String distCacheDir = TaskTracker.getPrivateDistributedCacheDir(
+        ugi.getShortUserName());
+    assertTrue("Cache file didn't get localized in the expected directory. " +
+        "Expected localization to happen within " + 
+        ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
+        ", but was localized at " + 
+        p, p.toString().contains(distCacheDir));
+    
+    ugi = UserGroupInformation.createRemoteUser("fooUserInMachine");
+    p = ugi.doAs(new PrivilegedExceptionAction<Path>() {
+      public Path run() 
+      throws IOException, LoginException, InterruptedException {
+        return checkLocalizedPath(false);
+      }
+    });
+    distCacheDir = TaskTracker.getPrivateDistributedCacheDir(
+        ugi.getShortUserName());
+    assertTrue("Cache file didn't get localized in the expected directory. " +
+        "Expected localization to happen within " + 
+        ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
+        ", but was localized at " + 
+        p, p.toString().contains(distCacheDir));
+    
+  }
+  
+  private Path checkLocalizedPath(boolean visibility) 
   throws IOException, LoginException, InterruptedException {
     TrackerDistributedCacheManager manager = 
       new TrackerDistributedCacheManager(conf, taskController);
@@ -310,7 +353,7 @@ public class TestTrackerDistributedCache
     String userName = getJobOwnerName();
     File workDir = new File(TEST_ROOT_DIR, "workdir");
     Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
-    if ("true".equals(visibility)) {
+    if (visibility) {
       createPublicTempFile(cacheFile);
     } else {
       createPrivateTempFile(cacheFile);
@@ -331,7 +374,7 @@ public class TestTrackerDistributedCache
           TaskTracker.getPublicDistributedCacheDir());
     TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
     String distCacheDir;
-    if ("true".equals(visibility)) {
+    if (visibility) {
       distCacheDir = TaskTracker.getPublicDistributedCacheDir(); 
     } else {
       distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName);
@@ -340,17 +383,18 @@ public class TestTrackerDistributedCache
       manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
           fs.getFileStatus(cacheFile), false,
           c.timestamp, new Path(TEST_ROOT_DIR), false,
-          Boolean.parseBoolean(visibility));
+          visibility);
     assertTrue("Cache file didn't get localized in the expected directory. " +
         "Expected localization to happen within " + 
         ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
         ", but was localized at " + 
         localizedPath, localizedPath.toString().contains(distCacheDir));
-    if ("true".equals(visibility)) {
+    if (visibility) {
       checkPublicFilePermissions(new Path[]{localizedPath});
     } else {
       checkFilePermissions(new Path[]{localizedPath});
     }
+    return localizedPath;
   }
   
   /**
@@ -424,7 +468,7 @@ public class TestTrackerDistributedCache
   }
   
   protected String getJobOwnerName() throws IOException {
-    return UserGroupInformation.getLoginUser().getUserName();
+    return UserGroupInformation.getCurrentUser().getUserName();
   }
   
   private long getFileStamp(Path file) throws IOException {
@@ -463,7 +507,8 @@ public class TestTrackerDistributedCache
         fs.getFileStatus(firstCacheFile), false,
         getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
     manager.releaseCache(firstCacheFile.toUri(), conf2,
-        getFileStamp(firstCacheFile));
+        getFileStamp(firstCacheFile), 
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
@@ -488,7 +533,8 @@ public class TestTrackerDistributedCache
         getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false);
     // Release the third cache so that it can be deleted while sweeping
     manager.releaseCache(thirdCacheFile.toUri(), conf2,
-        getFileStamp(thirdCacheFile));
+        getFileStamp(thirdCacheFile), 
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
     // Getting the fourth cache will make the number of sub directories becomes
     // 3 which is greater than 2. So the released cache will be deleted.
     manager.getLocalCache(fourthCacheFile.toUri(), conf2,