You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:42:25 UTC

svn commit: r1077114 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/fs/ mapred/org/apache/hadoop/filecache/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/filecache/

Author: omalley
Date: Fri Mar  4 03:42:25 2011
New Revision: 1077114

URL: http://svn.apache.org/viewvc?rev=1077114&view=rev
Log:
commit 3b9be6aaf466f7e10f87e2ee5f254caed27ef6bf
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date:   Wed Jan 20 15:28:40 2010 +0530

    MAPREDUCE:476 from https://issues.apache.org/jira/secure/attachment/12430866/476.20S-2.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-476. Extend DistributedCache to work locally (LocalJobRunner).
    +    (Philip Zeyliger via tomwhite)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java?rev=1077114&r1=1077113&r2=1077114&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java Fri Mar  4 03:42:25 2011
@@ -64,7 +64,7 @@ import org.apache.hadoop.security.UserGr
  * implementation is DistributedFileSystem.
  *****************************************************************/
 public abstract class FileSystem extends Configured implements Closeable {
-  private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
+  public static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
 
   public static final Log LOG = LogFactory.getLog(FileSystem.class);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java?rev=1077114&r1=1077113&r2=1077114&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java Fri Mar  4 03:42:25 2011
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.filecache;
 
-import org.apache.commons.logging.*;
 import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 
 import java.net.URI;
 
@@ -108,22 +108,23 @@ import java.net.URI;
  *     }
  *     
  * </pre></blockquote></p>
- * 
+ * It is also very common to use the DistributedCache by using
+ * {@link org.apache.hadoop.util.GenericOptionsParser}.
+ *
+ * This class includes methods that should be used by users
+ * (specifically those mentioned in the example above, as well
+ * as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}),
+ * as well as methods intended for use by the MapReduce framework
+ * (e.g., {@link org.apache.hadoop.mapred.JobClient}).  For implementation
+ * details, see {@link TrackerDistributedCacheManager} and
+ * {@link TaskDistributedCacheManager}.
+ *
+ * @see TrackerDistributedCacheManager
+ * @see TaskDistributedCacheManager
  * @see org.apache.hadoop.mapred.JobConf
  * @see org.apache.hadoop.mapred.JobClient
  */
 public class DistributedCache {
-  // cacheID to cacheStatus mapping
-  private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
-  
-  private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
-  
-  // default total cache size
-  private static final long DEFAULT_CACHE_SIZE = 10737418240L;
-
-  private static final Log LOG =
-    LogFactory.getLog(DistributedCache.class);
-  
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -148,15 +149,18 @@ public class DistributedCache {
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, FileStatus fileStatus,
                                    boolean isArchive, long confFileStamp,
                                    Path currentWorkDir) 
-  throws IOException {
+      throws IOException {
     return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
         confFileStamp, currentWorkDir, true);
   }
+
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -184,48 +188,19 @@ public class DistributedCache {
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
       Path baseDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf) 
-  throws IOException {
-    String cacheId = makeRelative(cache, conf);
-    CacheStatus lcacheStatus;
-    Path localizedPath;
-    synchronized (cachedArchives) {
-      lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null) {
-        // was never localized
-        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
-        cachedArchives.put(cacheId, lcacheStatus);
-      }
+      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
-      synchronized (lcacheStatus) {
-        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
-            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
-        lcacheStatus.refcount++;
-      }
-    }
-
-    // try deleting stuff if you can
-    long size = 0;
-    synchronized (baseDirSize) {
-      Long get = baseDirSize.get(baseDir);
-      if ( get != null ) {
-    	size = get.longValue();
-      }
-    }
-    // setting the cache size to a default of 10GB
-    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
-    if (allowedSize < size) {
-      // try some cache deletions
-      deleteCache(conf);
-    }
-    return localizedPath;
+    return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
+        baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+        honorSymLinkConf);
   }
 
-  
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -249,17 +224,18 @@ public class DistributedCache {
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, boolean isArchive,
                                    long confFileStamp, Path currentWorkDir) 
-  throws IOException {
+      throws IOException {
     return getLocalCache(cache, conf, 
                          baseDir, null, isArchive,
                          confFileStamp, currentWorkDir);
   }
-  
+
   /**
    * This is the opposite of getlocalcache. When you are done with
    * using the cache, you need to release the cache
@@ -267,232 +243,28 @@ public class DistributedCache {
    * @param conf configuration which contains the filesystem the cache 
    * is contained in.
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void releaseCache(URI cache, Configuration conf)
-    throws IOException {
-    String cacheId = makeRelative(cache, conf);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null)
-        return;
-      synchronized (lcacheStatus) {
-        lcacheStatus.refcount--;
-      }
-    }
+      throws IOException {
+    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
   }
   
-  // To delete the caches which have a refcount of zero
-  
-  private static void deleteCache(Configuration conf) throws IOException {
-    // try deleting cache Status with refcount of zero
-    synchronized (cachedArchives) {
-      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
-        String cacheId = (String) it.next();
-        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        synchronized (lcacheStatus) {
-          if (lcacheStatus.refcount == 0) {
-            // delete this cache entry
-            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-            synchronized (baseDirSize) {
-              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
-              if ( dirSize != null ) {
-            	dirSize -= lcacheStatus.size;
-            	baseDirSize.put(lcacheStatus.baseDir, dirSize);
-              }
-            }
-            it.remove();
-          }
-        }
-      }
-    }
-  }
-
-  /*
+  /**
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
    * hdfs://hostname:port/absolute_path -- the relative path is
    * hostname/absolute path -- if it is just /absolute_path -- then the
    * relative path is hostname of DFS this mapred cluster is running
    * on/absolute_path
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static String makeRelative(URI cache, Configuration conf)
-    throws IOException {
-    String host = cache.getHost();
-    if (host == null) {
-      host = cache.getScheme();
-    }
-    if (host == null) {
-      URI defaultUri = FileSystem.get(conf).getUri();
-      host = defaultUri.getHost();
-      if (host == null) {
-        host = defaultUri.getScheme();
-      }
-    }
-    String path = host + cache.getPath();
-    path = path.replace(":/","/");                // remove windows device colon
-    return path;
-  }
-
-  private static Path cacheFilePath(Path p) {
-    return new Path(p, p.getName());
-  }
-
-  // the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  private static Path localizeCache(Configuration conf, 
-                                    URI cache, long confFileStamp,
-                                    CacheStatus cacheStatus,
-                                    FileStatus fileStatus,
-                                    boolean isArchive, 
-                                    Path currentWorkDir,boolean honorSymLinkConf) 
-  throws IOException {
-    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
-    if(cache.getFragment() == null) {
-    	doSymlink = false;
-    }
-    FileSystem fs = getFileSystem(cache, conf);
-    String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
-    File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
-                           cacheStatus, fileStatus)) {
-      if (isArchive) {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                             link);
-        }
-        return cacheStatus.localLoadPath;
-      }
-      else {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                             link);
-        }
-        return cacheFilePath(cacheStatus.localLoadPath);
-      }
-    } else {
-      // remove the old archive
-      // if the old archive cannot be removed since it is being used by another
-      // job
-      // return null
-      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
-        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
-                              + " is in use and cannot be refreshed");
-      
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(cacheStatus.localLoadPath, true);
-      synchronized (baseDirSize) {
-    	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-    	if ( dirSize != null ) {
-    	  dirSize -= cacheStatus.size;
-    	  baseDirSize.put(cacheStatus.baseDir, dirSize);
-    	}
-      }
-      Path parchive = new Path(cacheStatus.localLoadPath,
-                               new Path(cacheStatus.localLoadPath.getName()));
-      
-      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
-        throw new IOException("Mkdirs failed to create directory " + 
-                              cacheStatus.localLoadPath.toString());
-      }
-
-      String cacheId = cache.getPath();
-      fs.copyToLocalFile(new Path(cacheId), parchive);
-      if (isArchive) {
-        String tmpArchive = parchive.toString().toLowerCase();
-        File srcFile = new File(parchive.toString());
-        File destDir = new File(parchive.getParent().toString());
-        if (tmpArchive.endsWith(".jar")) {
-          RunJar.unJar(srcFile, destDir);
-        } else if (tmpArchive.endsWith(".zip")) {
-          FileUtil.unZip(srcFile, destDir);
-        } else if (isTarFile(tmpArchive)) {
-          FileUtil.unTar(srcFile, destDir);
-        }
-        // else will not do anyhting
-        // and copy the file into the dir as it is
-      }
-      
-      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
-      cacheStatus.size = cacheSize;
-      synchronized (baseDirSize) {
-      	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-      	if( dirSize == null ) {
-      	  dirSize = Long.valueOf(cacheSize);
-      	} else {
-      	  dirSize += cacheSize;
-      	}
-      	baseDirSize.put(cacheStatus.baseDir, dirSize);
-      }
-      
-      // do chmod here 
-      try {
-        //Setting recursive permission to grant everyone read and execute
-        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-      } catch(InterruptedException e) {
-    	LOG.warn("Exception in chmod" + e.toString());
-      }
-
-      // update cacheStatus to reflect the newly cached file
-      cacheStatus.currentStatus = true;
-      cacheStatus.mtime = getTimestamp(conf, cache);
-    }
-    
-    if (isArchive){
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                           link);
-      }
-      return cacheStatus.localLoadPath;
-    }
-    else {
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                           link);
-      }
-      return cacheFilePath(cacheStatus.localLoadPath);
-    }
-  }
+      throws IOException {
+    return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
 
-  private static boolean isTarFile(String filename) {
-    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
-           filename.endsWith(".tar"));
-  }
-  
-  // Checks if the cache has already been localized and is fresh
-  private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
-                                          URI cache, long confFileStamp, 
-                                          CacheStatus lcacheStatus,
-                                          FileStatus fileStatus) 
-  throws IOException {
-    // check for existence of the cache
-    if (lcacheStatus.currentStatus == false) {
-      return false;
-    } else {
-      long dfsFileStamp;
-      if (fileStatus != null) {
-        dfsFileStamp = fileStatus.getModificationTime();
-      } else {
-        dfsFileStamp = getTimestamp(conf, cache);
-      }
-
-      // ensure that the file on hdfs hasn't been modified since the job started 
-      if (dfsFileStamp != confFileStamp) {
-        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
-        throw new IOException("File: " + cache + 
-                              " has changed on HDFS since job started");
-      }
-      
-      if (dfsFileStamp != lcacheStatus.mtime) {
-        // needs refreshing
-        return false;
-      }
-    }
-    
-    return true;
   }
 
   /**
@@ -516,21 +288,12 @@ public class DistributedCache {
    * @param jobCacheDir the target directory for creating symlinks
    * @param workDir the directory in which the symlinks are created
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
     throws IOException{
-    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
-           workDir == null || (!workDir.isDirectory())) {
-      return;
-    }
-    boolean createSymlink = getSymlink(conf);
-    if (createSymlink){
-      File[] list = jobCacheDir.listFiles();
-      for (int i=0; i < list.length; i++){
-        FileUtil.symLink(list[i].getAbsolutePath(),
-                         new File(workDir, list[i].getName()).toString());
-      }
-    }  
+    TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
   }
   
   private static String getFileSysName(URI url) {
@@ -554,7 +317,8 @@ public class DistributedCache {
   }
 
   /**
-   * Set the configuration with the given set of archives
+   * Set the configuration with the given set of archives. Intended
+   * to be used by user code.
    * @param archives The list of archives that need to be localized
    * @param conf Configuration which will be changed
    */
@@ -564,7 +328,8 @@ public class DistributedCache {
   }
 
   /**
-   * Set the configuration with the given set of files
+   * Set the configuration with the given set of files.  Intended to be
+   * used by user code.
    * @param files The list of files that need to be localized
    * @param conf Configuration which will be changed
    */
@@ -574,7 +339,8 @@ public class DistributedCache {
   }
 
   /**
-   * Get cache archives set in the Configuration
+   * Get cache archives set in the Configuration.  Used by
+   * internal DistributedCache and MapReduce code.
    * @param conf The configuration which contains the archives
    * @return A URI array of the caches set in the Configuration
    * @throws IOException
@@ -584,18 +350,19 @@ public class DistributedCache {
   }
 
   /**
-   * Get cache files set in the Configuration
+   * Get cache files set in the Configuration.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which contains the files
    * @return A URI array of the files set in the Configuration
    * @throws IOException
    */
-
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
     return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
   }
 
   /**
-   * Return the path array of the localized caches
+   * Return the path array of the localized caches.  Intended to be used
+   * by user code.
    * @param conf Configuration that contains the localized archives
    * @return A path array of localized caches
    * @throws IOException
@@ -607,7 +374,8 @@ public class DistributedCache {
   }
 
   /**
-   * Return the path array of the localized files
+   * Return the path array of the localized files.  Intended to be used
+   * by user code.
    * @param conf Configuration that contains the localized files
    * @return A path array of localized files
    * @throws IOException
@@ -618,7 +386,8 @@ public class DistributedCache {
   }
 
   /**
-   * Get the timestamps of the archives
+   * Get the timestamps of the archives.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
@@ -629,7 +398,8 @@ public class DistributedCache {
 
 
   /**
-   * Get the timestamps of the files
+   * Get the timestamps of the files.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
@@ -639,7 +409,8 @@ public class DistributedCache {
   }
 
   /**
-   * This is to check the timestamp of the archives to be localized
+   * This is to check the timestamp of the archives to be localized.
+   * Used by internal MapReduce code.
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of archives.
    * The order should be the same as the order in which the archives are added.
@@ -649,7 +420,8 @@ public class DistributedCache {
   }
 
   /**
-   * This is to check the timestamp of the files to be localized
+   * This is to check the timestamp of the files to be localized.
+   * Used by internal MapReduce code.
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of files.
    * The order should be the same as the order in which the files are added.
@@ -659,7 +431,8 @@ public class DistributedCache {
   }
   
   /**
-   * Set the conf to contain the location for localized archives 
+   * Set the conf to contain the location for localized archives.  Used
+   * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local archives
    */
@@ -668,7 +441,8 @@ public class DistributedCache {
   }
 
   /**
-   * Set the conf to contain the location for localized files 
+   * Set the conf to contain the location for localized files.  Used
+   * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local files
    */
@@ -677,7 +451,8 @@ public class DistributedCache {
   }
 
   /**
-   * Add a archives to be localized to the conf
+   * Add a archives to be localized to the conf.  Intended to
+   * be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
    */
@@ -688,7 +463,8 @@ public class DistributedCache {
   }
   
   /**
-   * Add a file to be localized to the conf
+   * Add a file to be localized to the conf.  Intended
+   * to be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
    */
@@ -700,7 +476,7 @@ public class DistributedCache {
 
   /**
    * Add an file path to the current set of classpath entries It adds the file
-   * to cache as well.
+   * to cache as well.  Intended to be used by user code.
    * 
    * @param file Path of the file to be added
    * @param conf Configuration that contains the classpath setting
@@ -717,7 +493,8 @@ public class DistributedCache {
   }
 
   /**
-   * Get the file entries in classpath as an array of Path
+   * Get the file entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
    */
@@ -736,7 +513,7 @@ public class DistributedCache {
 
   /**
    * Add an archive path to the current set of classpath entries. It adds the
-   * archive to cache as well.
+   * archive to cache as well.  Intended to be used by user code.
    * 
    * @param archive Path of the archive to be added
    * @param conf Configuration that contains the classpath setting
@@ -754,7 +531,8 @@ public class DistributedCache {
   }
 
   /**
-   * Get the archive entries in classpath as an array of Path
+   * Get the archive entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
    */
@@ -773,7 +551,8 @@ public class DistributedCache {
 
   /**
    * This method allows you to create symlinks in the current working directory
-   * of the task to all the cache files/archives
+   * of the task to all the cache files/archives.
+   * Intended to be used by user code.
    * @param conf the jobconf 
    */
   public static void createSymlink(Configuration conf){
@@ -783,6 +562,7 @@ public class DistributedCache {
   /**
    * This method checks to see if symlinks are to be create for the 
    * localized cache files in the current working directory 
+   * Used by internal DistributedCache code.
    * @param conf the jobconf
    * @return true if symlinks are to be created- else return false
    */
@@ -798,7 +578,7 @@ public class DistributedCache {
    * This method checks if there is a conflict in the fragment names 
    * of the uris. Also makes sure that each uri has a fragment. It 
    * is only to be called if you want to create symlinks for 
-   * the various archives and files.
+   * the various archives and files.  May be used by user code.
    * @param uriFiles The uri array of urifiles
    * @param uriArchives the uri array of uri archives
    */
@@ -840,52 +620,14 @@ public class DistributedCache {
     return true;
   }
 
-  private static class CacheStatus {
-    // false, not loaded yet, true is loaded
-    boolean currentStatus;
-
-    // the local load path of this cache
-    Path localLoadPath;
-    
-    //the base dir where the cache lies
-    Path baseDir;
-    
-    //the size of this cache
-    long size;
-
-    // number of instances using this cache
-    int refcount;
-
-    // the cache-file modification time
-    long mtime;
-
-    public CacheStatus(Path baseDir, Path localLoadPath) {
-      super();
-      this.currentStatus = false;
-      this.localLoadPath = localLoadPath;
-      this.refcount = 0;
-      this.mtime = -1;
-      this.baseDir = baseDir;
-      this.size = 0;
-    }
-  }
-
   /**
    * Clear the entire contents of the cache and delete the backing files. This
    * should only be used when the server is reinitializing, because the users
    * are going to lose their files.
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void purgeCache(Configuration conf) throws IOException {
-    synchronized (cachedArchives) {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
-        try {
-          localFs.delete(f.getValue().localLoadPath, true);
-        } catch (IOException ie) {
-          LOG.debug("Error cleaning up cache", ie);
-        }
-      }
-      cachedArchives.clear();
-    }
+    new TrackerDistributedCacheManager(conf).purgeCache();
   }
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1077114&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java Fri Mar  4 03:42:25 2011
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.filecache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Helper class of {@link TrackerDistributedCacheManager} that represents
+ * the cached files of a single task.  This class is used
+ * by TaskRunner/LocalJobRunner to parse out the job configuration
+ * and setup the local caches.
+ * 
+ * <b>This class is internal to Hadoop, and should not be treated as a public
+ * interface.</b>
+ */
+public class TaskDistributedCacheManager {
+  private final TrackerDistributedCacheManager distributedCacheManager;
+  private final Configuration taskConf;
+  private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
+  private final List<String> classPaths = new ArrayList<String>();
+ 
+  private boolean setupCalled = false;
+
+  /**
+   * Struct representing a single cached file.
+   * There are four permutations (archive, file) and
+   * (don't put in classpath, do put in classpath).
+   */
+  static class CacheFile {
+    /** URI as in the configuration */
+    final URI uri;
+    enum FileType {
+      REGULAR,
+      ARCHIVE
+    }
+    /** Whether to decompress */
+    final FileType type;
+    final long timestamp;
+    /** Whether this is to be added to the classpath */
+    final boolean shouldBeAddedToClassPath;
+
+    private CacheFile(URI uri, FileType type, long timestamp, 
+        boolean classPath) {
+      this.uri = uri;
+      this.type = type;
+      this.timestamp = timestamp;
+      this.shouldBeAddedToClassPath = classPath;
+    }
+
+    /**
+     * Converts the scheme used by DistributedCache to serialize what files to
+     * cache in the configuration into CacheFile objects that represent those 
+     * files.
+     */
+    private static List<CacheFile> makeCacheFiles(URI[] uris, 
+        String[] timestamps, Path[] paths, FileType type) {
+      List<CacheFile> ret = new ArrayList<CacheFile>();
+      if (uris != null) {
+        if (uris.length != timestamps.length) {
+          throw new IllegalArgumentException("Mismatched uris and timestamps.");
+        }
+        Map<String, Path> classPaths = new HashMap<String, Path>();
+        if (paths != null) {
+          for (Path p : paths) {
+            classPaths.put(p.toString(), p);
+          }
+        }
+        for (int i = 0; i < uris.length; ++i) {
+          URI u = uris[i];
+          boolean isClassPath = (null != classPaths.get(u.getPath()));
+          long t = Long.parseLong(timestamps[i]);
+          ret.add(new CacheFile(u, type, t, isClassPath));
+        }
+      }
+      return ret;
+    }
+  }
+
+  TaskDistributedCacheManager(
+      TrackerDistributedCacheManager distributedCacheManager,
+      Configuration taskConf) throws IOException {
+    this.distributedCacheManager = distributedCacheManager;
+    this.taskConf = taskConf;
+    
+    this.cacheFiles.addAll(
+        CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
+            DistributedCache.getFileTimestamps(taskConf),
+            DistributedCache.getFileClassPaths(taskConf),
+            CacheFile.FileType.REGULAR));
+    this.cacheFiles.addAll(
+        CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
+          DistributedCache.getArchiveTimestamps(taskConf),
+          DistributedCache.getArchiveClassPaths(taskConf), 
+          CacheFile.FileType.ARCHIVE));
+  }
+
+  /**
+   * Retrieve files into the local cache and updates the task configuration 
+   * (which has been passed in via the constructor).
+   * 
+   * It is the caller's responsibility to re-write the task configuration XML
+   * file, if necessary.
+   */
+  public void setup(LocalDirAllocator lDirAlloc, File workDir, 
+      String cacheSubdir) throws IOException {
+    setupCalled = true;
+    
+    if (cacheFiles.isEmpty()) {
+      return;
+    }
+
+    ArrayList<Path> localArchives = new ArrayList<Path>();
+    ArrayList<Path> localFiles = new ArrayList<Path>();
+    Path workdirPath = new Path(workDir.getAbsolutePath());
+
+    for (CacheFile cacheFile : cacheFiles) {
+      URI uri = cacheFile.uri;
+      FileSystem fileSystem = FileSystem.get(uri, taskConf);
+      FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
+      String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
+      String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
+      Path localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+                                fileStatus.getLen(), taskConf);
+      String baseDir = localPath.toString().replace(cacheId, "");
+      Path p = distributedCacheManager.getLocalCache(uri, taskConf,
+          new Path(baseDir), fileStatus, 
+          cacheFile.type == CacheFile.FileType.ARCHIVE,
+          cacheFile.timestamp, workdirPath, false);
+
+      if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
+        localArchives.add(p);
+      } else {
+        localFiles.add(p);
+      }
+      if (cacheFile.shouldBeAddedToClassPath) {
+        classPaths.add(p.toString());
+      }
+    }
+
+    // Update the configuration object with localized data.
+    if (!localArchives.isEmpty()) {
+      DistributedCache.setLocalArchives(taskConf, 
+        stringifyPathList(localArchives));
+    }
+    if (!localFiles.isEmpty()) {
+      DistributedCache.setLocalFiles(taskConf, stringifyPathList(localFiles));
+    }
+
+  }
+
+  private static String stringifyPathList(List<Path> p){
+    if (p == null || p.isEmpty()) {
+      return null;
+    }
+    StringBuilder str = new StringBuilder(p.get(0).toString());
+    for (int i = 1; i < p.size(); i++){
+      str.append(",");
+      str.append(p.get(i).toString());
+    }
+    return str.toString();
+  }
+
+  /** 
+   * Retrieves class paths (as local references) to add. 
+   * Should be called after setup().
+   * 
+   */
+  public List<String> getClassPaths() throws IOException {
+    if (!setupCalled) {
+      throw new IllegalStateException(
+          "getClassPaths() should be called after setup()");
+    }
+    return classPaths;
+  }
+
+  /**
+   * Releases the cached files/archives, so that space
+   * can be reclaimed by the {@link TrackerDistributedCacheManager}.
+   */
+  public void release() throws IOException {
+    for (CacheFile c : cacheFiles) {
+      distributedCacheManager.releaseCache(c.uri, taskConf);
+    }
+  }
+
+  /**
+   * Creates a class loader that includes the designated
+   * files and archives.
+   */
+  public ClassLoader makeClassLoader(final ClassLoader parent) 
+      throws MalformedURLException {
+    final URL[] urls = new URL[classPaths.size()];
+    for (int i = 0; i < classPaths.size(); ++i) {
+      urls[i] = new File(classPaths.get(i)).toURI().toURL();
+    }
+    return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
+      @Override
+      public ClassLoader run() {
+        return new URLClassLoader(urls, parent);
+      }     
+    });
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077114&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar  4 03:42:25 2011
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.filecache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * Manages a single machine's instance of a cross-job
+ * cache.  This class would typically be instantiated
+ * by a TaskTracker (or something that emulates it,
+ * like LocalJobRunner).
+ * 
+ * <b>This class is internal to Hadoop, and should not be treated as a public
+ * interface.</b>
+ */
+public class TrackerDistributedCacheManager {
+  // cacheID to cacheStatus mapping
+  private TreeMap<String, CacheStatus> cachedArchives = 
+    new TreeMap<String, CacheStatus>();
+
+  private TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+
+  // default total cache size (10GB)
+  private static final long DEFAULT_CACHE_SIZE = 10737418240L;
+
+  private static final Log LOG =
+    LogFactory.getLog(TrackerDistributedCacheManager.class);
+
+  private final LocalFileSystem localFs;
+
+  public TrackerDistributedCacheManager(Configuration conf) throws IOException {
+    this.localFs = FileSystem.getLocal(conf);
+  }
+
+  /**
+   * Get the locally cached file or archive; it could either be
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   *
+   * @param cache the cache to be localized, this should be specified as
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
+   * @param conf The Configuration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the 
+   *  files/archives
+   * @param fileStatus The file status on the dfs.
+   * @param isArchive if the cache is an archive or a file. In case it is an
+   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
+   *  be unzipped/unjarred/untarred automatically
+   *  and the directory where the archive is unzipped/unjarred/untarred is
+   *  returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify
+   * that the file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create
+   * symlinks for the locally cached files/archives
+   * @param honorSymLinkConf if this is false, then the symlinks are not
+   * created even if conf says so (this is required for an optimization in task
+   * launches
+   * NOTE: This is effectively always on since r696957, since there is no code
+   * path that does not use this.
+   * @return the path to directory where the archives are unjarred in case of
+   * archives, the path to the file where the file is copied locally
+   * @throws IOException
+   */
+  Path getLocalCache(URI cache, Configuration conf,
+      Path baseDir, FileStatus fileStatus,
+      boolean isArchive, long confFileStamp,
+      Path currentWorkDir, boolean honorSymLinkConf)
+      throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    CacheStatus lcacheStatus;
+    Path localizedPath;
+    synchronized (cachedArchives) {
+      lcacheStatus = cachedArchives.get(cacheId);
+      if (lcacheStatus == null) {
+        // was never localized
+        lcacheStatus = new CacheStatus(baseDir, 
+          new Path(baseDir, new Path(cacheId)));
+        cachedArchives.put(cacheId, lcacheStatus);
+      }
+
+      synchronized (lcacheStatus) {
+        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
+            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
+        lcacheStatus.refcount++;
+      }
+    }
+
+    // try deleting stuff if you can
+    long size = 0;
+    synchronized (baseDirSize) {
+      Long get = baseDirSize.get(baseDir);
+      if ( get != null ) {
+      size = get.longValue();
+      }
+    }
+    // setting the cache size to a default of 10GB
+    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+    if (allowedSize < size) {
+      // try some cache deletions
+      deleteCache(conf);
+    }
+    return localizedPath;
+  }
+
+  /**
+   * This is the opposite of getlocalcache. When you are done with
+   * using the cache, you need to release the cache
+   * @param cache The cache URI to be released
+   * @param conf configuration which contains the filesystem the cache
+   * is contained in.
+   * @throws IOException
+   */
+  void releaseCache(URI cache, Configuration conf)
+    throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    synchronized (cachedArchives) {
+      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+      if (lcacheStatus == null)
+        return;
+      synchronized (lcacheStatus) {
+        lcacheStatus.refcount--;
+      }
+    }
+  }
+
+  // To delete the caches which have a refcount of zero
+
+  private void deleteCache(Configuration conf) throws IOException {
+    // try deleting cache Status with refcount of zero
+    synchronized (cachedArchives) {
+      for (Iterator<String> it = cachedArchives.keySet().iterator(); 
+          it.hasNext();) {
+        String cacheId = it.next();
+        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+        synchronized (lcacheStatus) {
+          if (lcacheStatus.refcount == 0) {
+            // delete this cache entry
+            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+            synchronized (baseDirSize) {
+              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+              if ( dirSize != null ) {
+              dirSize -= lcacheStatus.size;
+              baseDirSize.put(lcacheStatus.baseDir, dirSize);
+              }
+            }
+            it.remove();
+          }
+        }
+      }
+    }
+  }
+
+  /*
+   * Returns the relative path of the dir this cache will be localized in
+   * relative path that this cache will be localized in. For
+   * hdfs://hostname:port/absolute_path -- the relative path is
+   * hostname/absolute path -- if it is just /absolute_path -- then the
+   * relative path is hostname of DFS this mapred cluster is running
+   * on/absolute_path
+   */
+  String makeRelative(URI cache, Configuration conf)
+    throws IOException {
+    String host = cache.getHost();
+    if (host == null) {
+      host = cache.getScheme();
+    }
+    if (host == null) {
+      URI defaultUri = FileSystem.get(conf).getUri();
+      host = defaultUri.getHost();
+      if (host == null) {
+        host = defaultUri.getScheme();
+      }
+    }
+    String path = host + cache.getPath();
+    path = path.replace(":/","/");                // remove windows device colon
+    return path;
+  }
+
+  private Path cacheFilePath(Path p) {
+    return new Path(p, p.getName());
+  }
+
+  // the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  private Path localizeCache(Configuration conf,
+                                    URI cache, long confFileStamp,
+                                    CacheStatus cacheStatus,
+                                    FileStatus fileStatus,
+                                    boolean isArchive,
+                                    Path currentWorkDir, 
+                                    boolean honorSymLinkConf)
+  throws IOException {
+    boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
+    if(cache.getFragment() == null) {
+      doSymlink = false;
+    }
+    FileSystem fs = FileSystem.get(cache, conf);
+    String link = 
+      currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
+    File flink = new File(link);
+    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                           cacheStatus, fileStatus)) {
+      LOG.info(String.format("Using existing cache of %s->%s",
+          cache.toString(), cacheStatus.localLoadPath));
+      if (isArchive) {
+        if (doSymlink){
+          if (!flink.exists())
+            FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+                             link);
+        }
+
+        return cacheStatus.localLoadPath;
+      }
+      else {
+        if (doSymlink){
+          if (!flink.exists())
+            FileUtil.symLink(
+              cacheFilePath(cacheStatus.localLoadPath).toString(), link);
+        }
+        return cacheFilePath(cacheStatus.localLoadPath);
+      }
+    } else {
+
+      // remove the old archive
+      // if the old archive cannot be removed since it is being used by another
+      // job
+      // return null
+      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+                              + " is in use and cannot be refreshed");
+
+      FileSystem localFs = FileSystem.getLocal(conf);
+      localFs.delete(cacheStatus.localLoadPath, true);
+      synchronized (baseDirSize) {
+      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      if ( dirSize != null ) {
+        dirSize -= cacheStatus.size;
+        baseDirSize.put(cacheStatus.baseDir, dirSize);
+      }
+      }
+      Path parchive = new Path(cacheStatus.localLoadPath,
+                               new Path(cacheStatus.localLoadPath.getName()));
+
+      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
+        throw new IOException("Mkdirs failed to create directory " +
+                              cacheStatus.localLoadPath.toString());
+      }
+
+      String cacheId = cache.getPath();
+      fs.copyToLocalFile(new Path(cacheId), parchive);
+      if (isArchive) {
+        String tmpArchive = parchive.toString().toLowerCase();
+        File srcFile = new File(parchive.toString());
+        File destDir = new File(parchive.getParent().toString());
+        LOG.info(String.format("Extracting %s to %s",
+            srcFile.toString(), destDir.toString()));
+        if (tmpArchive.endsWith(".jar")) {
+          RunJar.unJar(srcFile, destDir);
+        } else if (tmpArchive.endsWith(".zip")) {
+          FileUtil.unZip(srcFile, destDir);
+        } else if (isTarFile(tmpArchive)) {
+          FileUtil.unTar(srcFile, destDir);
+        } else {
+          LOG.warn(String.format(
+            "Cache file %s specified as archive, but not valid extension.", 
+            srcFile.toString()));
+          // else will not do anyhting
+          // and copy the file into the dir as it is
+        }
+      }
+
+      long cacheSize = 
+        FileUtil.getDU(new File(parchive.getParent().toString()));
+      cacheStatus.size = cacheSize;
+      synchronized (baseDirSize) {
+        Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+        if( dirSize == null ) {
+          dirSize = Long.valueOf(cacheSize);
+        } else {
+          dirSize += cacheSize;
+        }
+        baseDirSize.put(cacheStatus.baseDir, dirSize);
+      }
+
+      // do chmod here
+      try {
+        //Setting recursive permission to grant everyone read and execute
+        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+      } catch(InterruptedException e) {
+      LOG.warn("Exception in chmod" + e.toString());
+      }
+
+      // update cacheStatus to reflect the newly cached file
+      cacheStatus.currentStatus = true;
+      cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
+
+      LOG.info(String.format("Cached %s as %s",
+          cache.toString(), cacheStatus.localLoadPath));
+    }
+
+    if (isArchive){
+      if (doSymlink){
+        if (!flink.exists())
+          FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+                           link);
+      }
+      return cacheStatus.localLoadPath;
+    }
+    else {
+      if (doSymlink){
+        if (!flink.exists())
+          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
+                           link);
+      }
+      return cacheFilePath(cacheStatus.localLoadPath);
+    }
+  }
+
+  private static boolean isTarFile(String filename) {
+    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
+           filename.endsWith(".tar"));
+  }
+
+  // Checks if the cache has already been localized and is fresh
+  private boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
+                                          URI cache, long confFileStamp,
+                                          CacheStatus lcacheStatus,
+                                          FileStatus fileStatus)
+  throws IOException {
+    // check for existence of the cache
+    if (lcacheStatus.currentStatus == false) {
+      return false;
+    } else {
+      long dfsFileStamp;
+      if (fileStatus != null) {
+        dfsFileStamp = fileStatus.getModificationTime();
+      } else {
+        dfsFileStamp = DistributedCache.getTimestamp(conf, cache);
+      }
+
+      // ensure that the file on hdfs hasn't been modified since the job started
+      if (dfsFileStamp != confFileStamp) {
+        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+        throw new IOException("File: " + cache +
+                              " has changed on HDFS since job started");
+      }
+
+      if (dfsFileStamp != lcacheStatus.mtime) {
+        // needs refreshing
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * This method create symlinks for all files in a given dir in another 
+   * directory.
+   * 
+   * Should not be used outside of DistributedCache code.
+   * 
+   * @param conf the configuration
+   * @param jobCacheDir the target directory for creating symlinks
+   * @param workDir the directory in which the symlinks are created
+   * @throws IOException
+   */
+  public static void createAllSymlink(Configuration conf, File jobCacheDir, 
+      File workDir)
+    throws IOException{
+    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
+           workDir == null || (!workDir.isDirectory())) {
+      return;
+    }
+    boolean createSymlink = DistributedCache.getSymlink(conf);
+    if (createSymlink){
+      File[] list = jobCacheDir.listFiles();
+      for (int i=0; i < list.length; i++){
+        String target = list[i].getAbsolutePath();
+        String link = new File(workDir, list[i].getName()).toString();
+        LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+        int ret = FileUtil.symLink(target, link);
+        if (ret != 0) {
+          LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
+              link));
+        }
+      }
+    }
+  }
+
+  private static class CacheStatus {
+    // false, not loaded yet, true is loaded
+    boolean currentStatus;
+
+    // the local load path of this cache
+    Path localLoadPath;
+
+    //the base dir where the cache lies
+    Path baseDir;
+
+    //the size of this cache
+    long size;
+
+    // number of instances using this cache
+    int refcount;
+
+    // the cache-file modification time
+    long mtime;
+
+    public CacheStatus(Path baseDir, Path localLoadPath) {
+      super();
+      this.currentStatus = false;
+      this.localLoadPath = localLoadPath;
+      this.refcount = 0;
+      this.mtime = -1;
+      this.baseDir = baseDir;
+      this.size = 0;
+    }
+  }
+
+  /**
+   * Clear the entire contents of the cache and delete the backing files. This
+   * should only be used when the server is reinitializing, because the users
+   * are going to lose their files.
+   */
+  public void purgeCache() {
+    synchronized (cachedArchives) {
+      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
+        try {
+          localFs.delete(f.getValue().localLoadPath, true);
+        } catch (IOException ie) {
+          LOG.debug("Error cleaning up cache", ie);
+        }
+      }
+      cachedArchives.clear();
+    }
+  }
+
+  public TaskDistributedCacheManager newTaskDistributedCacheManager(
+      Configuration taskConf) throws IOException {
+    return new TaskDistributedCacheManager(this, taskConf);
+  }
+
+  /**
+   * Determines timestamps of files to be cached, and stores those
+   * in the configuration.  This is intended to be used internally by JobClient
+   * after all cache files have been added.
+   * 
+   * This is an internal method!
+   * 
+   * @param job Configuration of a job.
+   * @throws IOException
+   */
+  public static void determineTimestamps(Configuration job) throws IOException {
+    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    if (tarchives != null) {
+      StringBuffer archiveTimestamps = 
+        new StringBuffer(String.valueOf(
+            DistributedCache.getTimestamp(job, tarchives[0])));
+      for (int i = 1; i < tarchives.length; i++) {
+        archiveTimestamps.append(",");
+        archiveTimestamps.append(String.valueOf(
+            DistributedCache.getTimestamp(job, tarchives[i])));
+      }
+      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
+    }
+  
+    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    if (tfiles != null) {
+      StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
+          DistributedCache.getTimestamp(job, tfiles[0])));
+      for (int i = 1; i < tfiles.length; i++) {
+        fileTimestamps.append(",");
+        fileTimestamps.append(String.valueOf(
+            DistributedCache.getTimestamp(job, tfiles[i])));
+      }
+      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077114&r1=1077113&r2=1077114&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 03:42:25 2011
@@ -161,7 +161,7 @@ class Child {
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
-        TaskRunner.setupWorkDir(job);
+        TaskRunner.setupWorkDir(job, new File(".").getAbsoluteFile());
 
         numTasksToExecute = job.getNumTasksToExecutePerJvm();
         assert(numTasksToExecute != 0);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077114&r1=1077113&r2=1077114&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar  4 03:42:25 2011
@@ -48,6 +48,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -560,15 +561,12 @@ public class JobClient extends Configure
                "Applications should implement Tool for the same.");
     }
 
-    // get all the command line arguments into the 
-    // jobconf passed in by the user conf
-    String files = null;
-    String libjars = null;
-    String archives = null;
-
-    files = job.get("tmpfiles");
-    libjars = job.get("tmpjars");
-    archives = job.get("tmparchives");
+    // Retrieve command line arguments placed into the JobConf
+    // by GenericOptionsParser.
+    String files = job.get("tmpfiles");
+    String libjars = job.get("tmpjars");
+    String archives = job.get("tmparchives");
+
     /*
      * set this user's id in job configuration, so later job files can be
      * accessed using this user's id
@@ -648,27 +646,7 @@ public class JobClient extends Configure
     }
     
     //  set the timestamps of the archives and files
-    URI[] tarchives = DistributedCache.getCacheArchives(job);
-    if (tarchives != null) {
-      StringBuffer archiveTimestamps = 
-        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
-      for (int i = 1; i < tarchives.length; i++) {
-        archiveTimestamps.append(",");
-        archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
-      }
-      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
-    }
-
-    URI[] tfiles = DistributedCache.getCacheFiles(job);
-    if (tfiles != null) {
-      StringBuffer fileTimestamps = 
-        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
-      for (int i = 1; i < tfiles.length; i++) {
-        fileTimestamps.append(",");
-        fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
-      }
-      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
-    }
+    TrackerDistributedCacheManager.determineTimestamps(job);
        
     String originalJarPath = job.getJar();
 
@@ -695,6 +673,7 @@ public class JobClient extends Configure
     }
   }
 
+
   private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
     UnixUserGroupInformation ugi = null;
     try {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077114&r1=1077113&r2=1077114&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:42:25 2011
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -27,13 +29,15 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.JobTrackerMetricsInst;
-import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -60,19 +64,28 @@ class LocalJobRunner implements JobSubmi
   
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
-    private Path file;
+    // The job directory on the system: JobClient places job configurations here.
+    // This is analogous to JobTracker's system directory.
+    private Path systemJobDir;
+    private Path systemJobFile;
+
+    // The job directory for the task.  Analagous to a task's job directory.
+    private Path localJobDir;
+    private Path localJobFile;
+
     private JobID id;
     private JobConf job;
-    private Path systemJobDir;
 
     private JobStatus status;
     private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
 
     private JobProfile profile;
-    private Path localFile;
     private FileSystem localFs;
     boolean killed = false;
     
+    private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+    private TaskDistributedCacheManager taskDistributedCacheManager;
+    
     // Counters summed over all the map/reduce tasks which
     // have successfully completed
     private Counters completedTaskCounters = new Counters();
@@ -86,15 +99,56 @@ class LocalJobRunner implements JobSubmi
     
     public Job(JobID jobid, String jobSubmitDir) throws IOException {
       this.systemJobDir = new Path(jobSubmitDir);
-      this.file = new Path(systemJobDir, "job.xml");
+      this.systemJobFile = new Path(systemJobDir, "job.xml");
       this.id = jobid;
 
-      this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
       this.localFs = FileSystem.getLocal(conf);
 
-      fs.copyToLocalFile(file, localFile);
-      this.job = new JobConf(localFile);
-      profile = new JobProfile(job.getUser(), id, file.toString(), 
+      this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
+      this.localJobFile = new Path(this.localJobDir, id + ".xml");
+
+      // Manage the distributed cache.  If there are files to be copied,
+      // this will trigger localFile to be re-written again.
+      this.trackerDistributerdCacheManager =
+        new TrackerDistributedCacheManager(conf);
+      this.taskDistributedCacheManager =
+        trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
+      taskDistributedCacheManager.setup(
+          new LocalDirAllocator("mapred.local.dir"),
+          new File(systemJobDir.toString()),
+      "archive");
+
+      if (DistributedCache.getSymlink(conf)) {
+        // This is not supported largely because,
+        // for a Child subprocess, the cwd in LocalJobRunner
+        // is not a fresh slate, but rather the user's working directory.
+        // This is further complicated because the logic in
+        // setupWorkDir only creates symlinks if there's a jarfile
+        // in the configuration.
+        LOG.warn("LocalJobRunner does not support " +
+        "symlinking into current working dir.");
+      }
+      // Setup the symlinks for the distributed cache.
+      TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile());
+
+      // Write out configuration file.  Instead of copying it from
+      // systemJobFile, we re-write it, since setup(), above, may have
+      // updated it.
+      OutputStream out = localFs.create(localJobFile);
+      try {
+        conf.writeXml(out);
+      } finally {
+        out.close();
+      }
+      this.job = new JobConf(localJobFile);
+
+      // Job (the current object) is a Thread, so we wrap its class loader.
+      if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
+        setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
+            getContextClassLoader()));
+      }
+
+      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
                                "http://localhost:8080/", job.getJobName());
       status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);
 
@@ -130,7 +184,7 @@ class LocalJobRunner implements JobSubmi
           if (!this.isInterrupted()) {
             TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
             mapIds.add(mapId);
-            MapTask map = new MapTask(file.toString(),  
+            MapTask map = new MapTask(systemJobFile.toString(),  
                                       mapId, i,
                                       taskSplitMetaInfos[i].getSplitIndex(), 1);
             JobConf localConf = new JobConf(job);
@@ -140,7 +194,7 @@ class LocalJobRunner implements JobSubmi
             mapOutput.setConf(localConf);
             mapOutputFiles.put(mapId, mapOutput);
 
-            map.setJobFile(localFile.toString());
+            map.setJobFile(localJobFile.toString());
             map.localizeConfiguration(localConf);
             map.setConf(localConf);
             map_tasks += 1;
@@ -158,7 +212,7 @@ class LocalJobRunner implements JobSubmi
         try {
           if (numReduceTasks > 0) {
             ReduceTask reduce =
-                new ReduceTask(file.toString(), reduceId, 0, mapIds.size(),
+                new ReduceTask(systemJobFile.toString(), reduceId, 0, mapIds.size(),
                     1);
             JobConf localConf = new JobConf(job);
             TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
@@ -183,7 +237,7 @@ class LocalJobRunner implements JobSubmi
               }
             }
             if (!this.isInterrupted()) {
-              reduce.setJobFile(localFile.toString());
+              reduce.setJobFile(localJobFile.toString());
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
@@ -231,8 +285,11 @@ class LocalJobRunner implements JobSubmi
 
       } finally {
         try {
-          fs.delete(file.getParent(), true);  // delete submit dir
-          localFs.delete(localFile, true);              // delete local copy
+          fs.delete(systemJobFile.getParent(), true);  // delete submit dir
+          localFs.delete(localJobFile, true);              // delete local copy
+          // Cleanup distributed cache
+          taskDistributedCacheManager.release();
+          trackerDistributerdCacheManager.purgeCache();
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
         }
@@ -469,5 +526,5 @@ class LocalJobRunner implements JobSubmi
   @Override
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException{
     return null;
-}
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077114&r1=1077113&r2=1077114&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 03:42:25 2011
@@ -33,6 +33,8 @@ import java.util.Vector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -64,6 +66,7 @@ abstract class TaskRunner extends Thread
 
   
   private TaskTracker tracker;
+  private TaskDistributedCacheManager taskDistributedCacheManager;
 
   protected JobConf conf;
   JvmManager jvmManager;
@@ -100,18 +103,6 @@ abstract class TaskRunner extends Thread
    * not execute user code, only system code.
    */
   public void close() throws IOException {}
-
-  private static String stringifyPathArray(Path[] p){
-    if (p == null){
-      return null;
-    }
-    StringBuffer str = new StringBuffer(p[0].toString());
-    for (int i = 1; i < p.length; i++){
-      str.append(",");
-      str.append(p[i].toString());
-    }
-    return str.toString();
-  }
   
   /**
    * Get the java command line options for the child map/reduce tasks.
@@ -165,11 +156,12 @@ abstract class TaskRunner extends Thread
       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
       File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
       
-      URI[] archives = DistributedCache.getCacheArchives(conf);
-      URI[] files = DistributedCache.getCacheFiles(conf);
       // We don't create any symlinks yet, so presence/absence of workDir
       // actually on the file system doesn't matter.
-      setupDistributedCache(lDirAlloc, workDir, archives, files);
+      taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
+                                    .newTaskDistributedCacheManager(conf);
+      taskDistributedCacheManager.setup(lDirAlloc, workDir,
+                                        TaskTracker.getDistributedCacheDir());
       
       // Set up the child task's configuration. After this call, no localization
       // of files should happen in the TaskTracker's process space. Any changes to
@@ -180,9 +172,10 @@ abstract class TaskRunner extends Thread
         return;
       }
       
-      // Build classpath
-      List<String> classPaths = getClassPaths(conf, workDir, archives, files);
-      
+      // Accumulates class paths for child.
+      List<String> classPaths = getClassPaths(conf, workDir,
+                                              taskDistributedCacheManager);
+
       long logSize = TaskLog.getTaskLogLength(conf);
       
       //  Build exec child JVM args.
@@ -240,18 +233,8 @@ abstract class TaskRunner extends Thread
       }
     } finally {
       try{
-        URI[] archives = DistributedCache.getCacheArchives(conf);
-        URI[] files = DistributedCache.getCacheFiles(conf);
-        if (archives != null){
-          for (int i = 0; i < archives.length; i++){
-            DistributedCache.releaseCache(archives[i], conf);
-          }
-        }
-        if (files != null){
-          for(int i = 0; i < files.length; i++){
-            DistributedCache.releaseCache(files[i], conf);
-          }
-        }
+        taskDistributedCacheManager.release();
+
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
@@ -466,7 +449,7 @@ abstract class TaskRunner extends Thread
   /**
    */
   private static List<String> getClassPaths(JobConf conf, File workDir,
-      URI[] archives, URI[] files)
+      TaskDistributedCacheManager taskDistributedCacheManager)
       throws IOException {
     // Accumulates class paths for child.
     List<String> classPaths = new ArrayList<String>();
@@ -477,7 +460,7 @@ abstract class TaskRunner extends Thread
     appendJobJarClasspaths(conf.getJar(), classPaths);
     
     // Distributed cache paths
-    appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+    classPaths.addAll(taskDistributedCacheManager.getClassPaths());
     
     // Include the working dir too
     classPaths.add(workDir.toString());
@@ -612,105 +595,6 @@ abstract class TaskRunner extends Thread
     return new File(workDir.toString());
   }
 
-  private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
-      URI[] archives, URI[] files) throws IOException {
-    FileStatus fileStatus;
-    FileSystem fileSystem;
-    Path localPath;
-    String baseDir;
-    if ((archives != null) || (files != null)) {
-      if (archives != null) {
-        String[] archivesTimestamps = 
-                             DistributedCache.getArchiveTimestamps(conf);
-        Path[] p = new Path[archives.length];
-        for (int i = 0; i < archives.length;i++){
-          fileSystem = FileSystem.get(archives[i], conf);
-          fileStatus = fileSystem.getFileStatus(
-                                    new Path(archives[i].getPath()));
-          String cacheId = DistributedCache.makeRelative(archives[i],conf);
-          String cachePath = TaskTracker.getDistributedCacheDir() + 
-                               Path.SEPARATOR + cacheId;
-          
-          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                    fileStatus.getLen(), conf);
-          baseDir = localPath.toString().replace(cacheId, "");
-          p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                                                new Path(baseDir),
-                                                fileStatus,
-                                                true, Long.parseLong(
-                                                      archivesTimestamps[i]),
-                                                new Path(workDir.
-                                                      getAbsolutePath()), 
-                                                false);
-          
-        }
-        DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
-      }
-      if ((files != null)) {
-        String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
-        Path[] p = new Path[files.length];
-        for (int i = 0; i < files.length;i++){
-          fileSystem = FileSystem.get(files[i], conf);
-          fileStatus = fileSystem.getFileStatus(
-                                    new Path(files[i].getPath()));
-          String cacheId = DistributedCache.makeRelative(files[i], conf);
-          String cachePath = TaskTracker.getDistributedCacheDir() +
-                               Path.SEPARATOR + cacheId;
-          
-          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                    fileStatus.getLen(), conf);
-          baseDir = localPath.toString().replace(cacheId, "");
-          p[i] = DistributedCache.getLocalCache(files[i], conf, 
-                                                new Path(baseDir),
-                                                fileStatus,
-                                                false, Long.parseLong(
-                                                         fileTimestamps[i]),
-                                                new Path(workDir.
-                                                      getAbsolutePath()), 
-                                                false);
-        }
-        DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
-      }
-    }
-  }
-
-  private static void appendDistributedCacheClasspaths(JobConf conf,
-      URI[] archives, URI[] files, List<String> classPaths)
-      throws IOException {
-    // Archive paths
-    Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
-    if (archiveClasspaths != null && archives != null) {
-      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
-      if (localArchives != null){
-        for (int i=0;i<archives.length;i++){
-          for(int j=0;j<archiveClasspaths.length;j++){
-            if (archives[i].getPath().equals(
-                                             archiveClasspaths[j].toString())){
-              classPaths.add(localArchives[i].toString());
-            }
-          }
-        }
-      }
-    }
-    
-    //file paths
-    Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
-    if (fileClasspaths!=null && files != null) {
-      Path[] localFiles = DistributedCache
-        .getLocalCacheFiles(conf);
-      if (localFiles != null) {
-        for (int i = 0; i < files.length; i++) {
-          for (int j = 0; j < fileClasspaths.length; j++) {
-            if (files[i].getPath().equals(
-                                          fileClasspaths[j].toString())) {
-              classPaths.add(localFiles[i].toString());
-            }
-          }
-        }
-      }
-    }
-  }
-
   private static void appendSystemClasspaths(List<String> classPaths) {
     for (String c : System.getProperty("java.class.path").split(
         SYSTEM_PATH_SEPARATOR)) {
@@ -743,13 +627,23 @@ abstract class TaskRunner extends Thread
     classPaths.add(new File(jobCacheDir, "classes").toString());
     classPaths.add(jobCacheDir.toString());
   }
-  
-  //Mostly for setting up the symlinks. Note that when we setup the distributed
-  //cache, we didn't create the symlinks. This is done on a per task basis
-  //by the currently executing task.
-  public static void setupWorkDir(JobConf conf) throws IOException {
-    File workDir = new File(".").getAbsoluteFile();
+   
+  /**
+   * Creates distributed cache symlinks and tmp directory, as appropriate.
+   * Note that when we setup the distributed
+   * cache, we didn't create the symlinks. This is done on a per task basis
+   * by the currently executing task.
+   * 
+   * @param conf The job configuration.
+   * @param workDir Working directory, which is completely deleted.
+   */
+  public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
+    LOG.debug("Fully deleting and re-creating" + workDir);
     FileUtil.fullyDelete(workDir);
+    if (!workDir.mkdir()) {
+      LOG.debug("Did not recreate " + workDir);
+    }
+    
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
@@ -758,48 +652,57 @@ abstract class TaskRunner extends Thread
       if (archives != null) {
         for (int i = 0; i < archives.length; i++) {
           String link = archives[i].getFragment();
-          if (link != null) {
-            link = workDir.toString() + Path.SEPARATOR + link;
-            File flink = new File(link);
-            if (!flink.exists()) {
-              FileUtil.symLink(localArchives[i].toString(), link);
-            }
-          }
+          String target = localArchives[i].toString();
+          symlink(workDir, target, link);
         }
       }
       if (files != null) {
         for (int i = 0; i < files.length; i++) {
           String link = files[i].getFragment();
-          if (link != null) {
-            link = workDir.toString() + Path.SEPARATOR + link;
-            File flink = new File(link);
-            if (!flink.exists()) {
-              FileUtil.symLink(localFiles[i].toString(), link);
-            }
-          }
+          String target = localFiles[i].toString();
+          symlink(workDir, target, link);
         }
       }
     }
-    File jobCacheDir = null;
+
     if (conf.getJar() != null) {
-      jobCacheDir = new File(
+      File jobCacheDir = new File(
           new Path(conf.getJar()).getParent().toString());
-    }
 
-    // create symlinks for all the files in job cache dir in current
-    // workingdir for streaming
-    try{
-      DistributedCache.createAllSymlink(conf, jobCacheDir,
-          workDir);
-    } catch(IOException ie){
-      // Do not exit even if symlinks have not been created.
-      LOG.warn(StringUtils.stringifyException(ie));
+      // create symlinks for all the files in job cache dir in current
+      // workingdir for streaming
+      try{
+        TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir,
+            workDir);
+      } catch(IOException ie){
+        // Do not exit even if symlinks have not been created.
+        LOG.warn(StringUtils.stringifyException(ie));
+      }
     }
 
     createChildTmpDir(workDir, conf);
   }
 
   /**
+   * Utility method for creating a symlink and warning on errors.
+   *
+   * If link is null, does nothing.
+   */
+  private static void symlink(File workDir, String target, String link)
+      throws IOException {
+    if (link != null) {
+      link = workDir.toString() + Path.SEPARATOR + link;
+      File flink = new File(link);
+      if (!flink.exists()) {
+        LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+        if (0 != FileUtil.symLink(target, link)) {
+          LOG.warn(String.format("Failed to create symlink: %s <- %s", target, link));
+        }
+      }
+    }
+  }
+
+  /**
    * Kill the child process
    */
   public void kill() {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077114&r1=1077113&r2=1077114&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:42:25 2011
@@ -53,7 +53,7 @@ import javax.servlet.http.HttpServletRes
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -161,6 +161,8 @@ public class TaskTracker 
 
   Server taskReportServer = null;
   InterTrackerProtocol jobClient;
+  
+  private TrackerDistributedCacheManager distributedCacheManager;
     
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
@@ -633,8 +635,11 @@ public class TaskTracker 
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    // Clear out temporary files that might be lying around
-    DistributedCache.purgeCache(this.fConf);
+    // Initialize DistributedCache and
+    // clear out temporary files that might be lying around
+    this.distributedCacheManager = 
+        new TrackerDistributedCacheManager(this.fConf);
+    this.distributedCacheManager.purgeCache();
     cleanupStorage();
 
     this.jobClient = (InterTrackerProtocol) 
@@ -3717,6 +3722,10 @@ public class TaskTracker 
     healthChecker.start();
   }
   
+  TrackerDistributedCacheManager getTrackerDistributedCacheManager() {
+    return distributedCacheManager;
+  }
+
     /**
      * Download the job-token file from the FS and save on local fs.
      * @param user