You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/08/25 12:27:54 UTC

svn commit: r807543 [1/2] - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/filecache/

Author: tomwhite
Date: Tue Aug 25 10:27:53 2009
New Revision: 807543

URL: http://svn.apache.org/viewvc?rev=807543&view=rev
Log:
MAPREDUCE-476. Extend DistributedCache to work locally (LocalJobRunner). Contributed by Philip Zeyliger.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Removed:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
    hadoop/mapreduce/trunk/src/test/commit-tests

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Aug 25 10:27:53 2009
@@ -253,6 +253,9 @@
     in Configuration for dumping in JSON format from Hudson trunk build #68.
     (yhemanth)
 
+    MAPREDUCE-476. Extend DistributedCache to work locally (LocalJobRunner).
+    (Philip Zeyliger via tomwhite)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Tue Aug 25 10:27:53 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
@@ -27,7 +28,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -146,7 +146,7 @@
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
-        TaskRunner.setupWorkDir(job);
+        TaskRunner.setupWorkDir(job, new File(".").getAbsoluteFile());
 
         numTasksToExecute = job.getNumTasksToExecutePerJvm();
         assert(numTasksToExecute != 0);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Aug 25 10:27:53 2009
@@ -48,6 +48,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -567,15 +568,12 @@
                "Applications should implement Tool for the same.");
     }
 
-    // get all the command line arguments into the 
-    // jobconf passed in by the user conf
-    String files = null;
-    String libjars = null;
-    String archives = null;
-
-    files = job.get("tmpfiles");
-    libjars = job.get("tmpjars");
-    archives = job.get("tmparchives");
+    // Retrieve command line arguments placed into the JobConf
+    // by GenericOptionsParser.
+    String files = job.get("tmpfiles");
+    String libjars = job.get("tmpjars");
+    String archives = job.get("tmparchives");
+
     /*
      * set this user's id in job configuration, so later job files can be
      * accessed using this user's id
@@ -651,27 +649,7 @@
     }
     
     //  set the timestamps of the archives and files
-    URI[] tarchives = DistributedCache.getCacheArchives(job);
-    if (tarchives != null) {
-      StringBuffer archiveTimestamps = 
-        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
-      for (int i = 1; i < tarchives.length; i++) {
-        archiveTimestamps.append(",");
-        archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
-      }
-      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
-    }
-
-    URI[] tfiles = DistributedCache.getCacheFiles(job);
-    if (tfiles != null) {
-      StringBuffer fileTimestamps = 
-        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
-      for (int i = 1; i < tfiles.length; i++) {
-        fileTimestamps.append(",");
-        fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
-      }
-      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
-    }
+    TrackerDistributedCacheManager.determineTimestamps(job);
        
     String originalJarPath = job.getJar();
 
@@ -700,6 +678,7 @@
 
   }
 
+
   private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
     UnixUserGroupInformation ugi = null;
     try {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Aug 25 10:27:53 2009
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -26,15 +28,17 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.JobTrackerMetricsInst;
-import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
@@ -82,9 +86,16 @@
     return rawSplits;
   }
 
-  private class Job extends Thread
-    implements TaskUmbilicalProtocol {
-    private Path file;
+  private class Job extends Thread implements TaskUmbilicalProtocol {
+    // The job directory on the system: JobClient places job configurations here.
+    // This is analogous to JobTracker's system directory.
+    private Path systemJobDir;
+    private Path systemJobFile;
+    
+    // The job directory for the task.  Analagous to a task's job directory.
+    private Path localJobDir;
+    private Path localJobFile;
+
     private JobID id;
     private JobConf job;
 
@@ -92,10 +103,12 @@
     private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
 
     private JobProfile profile;
-    private Path localFile;
     private FileSystem localFs;
     boolean killed = false;
     
+    private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+    private TaskDistributedCacheManager taskDistributedCacheManager;
+    
     // Counters summed over all the map/reduce tasks which
     // have successfully completed
     private Counters completedTaskCounters = new Counters();
@@ -108,15 +121,55 @@
     }
     
     public Job(JobID jobid, JobConf conf) throws IOException {
-      this.file = new Path(getSystemDir(), jobid + "/job.xml");
+      this.systemJobDir = new Path(getSystemDir(), jobid.toString());
+      this.systemJobFile = new Path(systemJobDir, "job.xml");
       this.id = jobid;
-
-      this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
       this.localFs = FileSystem.getLocal(conf);
+      this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
+      this.localJobFile = new Path(this.localJobDir, id + ".xml");
 
-      fs.copyToLocalFile(file, localFile);
-      this.job = new JobConf(localFile);
-      profile = new JobProfile(job.getUser(), id, file.toString(), 
+      // Manage the distributed cache.  If there are files to be copied,
+      // this will trigger localFile to be re-written again.
+      this.trackerDistributerdCacheManager =
+          new TrackerDistributedCacheManager(conf);
+      this.taskDistributedCacheManager = 
+          trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
+      taskDistributedCacheManager.setup(
+          new LocalDirAllocator("mapred.local.dir"), 
+          new File(systemJobDir.toString()),
+          "archive");
+      
+      if (DistributedCache.getSymlink(conf)) {
+        // This is not supported largely because, 
+        // for a Child subprocess, the cwd in LocalJobRunner
+        // is not a fresh slate, but rather the user's working directory.
+        // This is further complicated because the logic in
+        // setupWorkDir only creates symlinks if there's a jarfile
+        // in the configuration.
+        LOG.warn("LocalJobRunner does not support " +
+        		"symlinking into current working dir.");
+      }
+      // Setup the symlinks for the distributed cache.
+      TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile());
+      
+      // Write out configuration file.  Instead of copying it from
+      // systemJobFile, we re-write it, since setup(), above, may have
+      // updated it.
+      OutputStream out = localFs.create(localJobFile);
+      try {
+        conf.writeXml(out);
+      } finally {
+        out.close();
+      }
+      this.job = new JobConf(localJobFile);
+      
+      // Job (the current object) is a Thread, so we wrap its class loader.
+      if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
+        setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
+                getContextClassLoader()));
+      }
+      
+      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
                                "http://localhost:8080/", job.getJobName());
       status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, 
           profile.getUser(), profile.getJobName(), profile.getJobFile(), 
@@ -174,7 +227,7 @@
             TaskAttemptID mapId = new TaskAttemptID(
                 new TaskID(jobId, TaskType.MAP, i),0);  
             mapIds.add(mapId);
-            MapTask map = new MapTask(file.toString(),  
+            MapTask map = new MapTask(systemJobFile.toString(),  
                                       mapId, i,
                                       rawSplits[i].getClassName(),
                                       rawSplits[i].getBytes(), 1);
@@ -185,7 +238,7 @@
             mapOutput.setConf(localConf);
             mapOutputFiles.put(mapId, mapOutput);
 
-            map.setJobFile(localFile.toString());
+            map.setJobFile(localJobFile.toString());
             map.localizeConfiguration(localConf);
             map.setConf(localConf);
             map_tasks += 1;
@@ -202,7 +255,7 @@
           new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
         try {
           if (numReduceTasks > 0) {
-            ReduceTask reduce = new ReduceTask(file.toString(), 
+            ReduceTask reduce = new ReduceTask(systemJobFile.toString(), 
                 reduceId, 0, mapIds.size(), 1);
             JobConf localConf = new JobConf(job);
             TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
@@ -227,7 +280,7 @@
               }
             }
             if (!this.isInterrupted()) {
-              reduce.setJobFile(localFile.toString());
+              reduce.setJobFile(localJobFile.toString());
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
@@ -275,8 +328,11 @@
 
       } finally {
         try {
-          fs.delete(file.getParent(), true);  // delete submit dir
-          localFs.delete(localFile, true);              // delete local copy
+          fs.delete(systemJobFile.getParent(), true);  // delete submit dir
+          localFs.delete(localJobFile, true);              // delete local copy
+          // Cleanup distributed cache
+          taskDistributedCacheManager.release();
+          trackerDistributerdCacheManager.purgeCache();
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
         }
@@ -489,5 +545,5 @@
   @Override
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException{
     return null;
-}
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Aug 25 10:27:53 2009
@@ -33,8 +33,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -64,6 +65,7 @@
 
   
   private TaskTracker tracker;
+  private TaskDistributedCacheManager taskDistributedCacheManager;
 
   protected JobConf conf;
   JvmManager jvmManager;
@@ -101,18 +103,6 @@
    */
   public void close() throws IOException {}
 
-  private static String stringifyPathArray(Path[] p){
-    if (p == null){
-      return null;
-    }
-    StringBuffer str = new StringBuffer(p[0].toString());
-    for (int i = 1; i < p.length; i++){
-      str.append(",");
-      str.append(p[i].toString());
-    }
-    return str.toString();
-  }
-  
 
   /**
    * Get the java command line options for the child map/reduce tasks.
@@ -173,11 +163,12 @@
       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
       File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
 
-      URI[] archives = DistributedCache.getCacheArchives(conf);
-      URI[] files = DistributedCache.getCacheFiles(conf);
       // We don't create any symlinks yet, so presence/absence of workDir
       // actually on the file system doesn't matter.
-      setupDistributedCache(lDirAlloc, workDir, archives, files);
+      taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
+          .newTaskDistributedCacheManager(conf);
+      taskDistributedCacheManager.setup(
+          lDirAlloc, workDir, TaskTracker.getDistributedCacheDir());
 
       // Set up the child task's configuration. After this call, no localization
       // of files should happen in the TaskTracker's process space. Any changes to
@@ -189,7 +180,8 @@
       }
 
       // Build classpath
-      List<String> classPaths = getClassPaths(conf, workDir, archives, files);
+      List<String> classPaths =
+          getClassPaths(conf, workDir, taskDistributedCacheManager);
 
       long logSize = TaskLog.getTaskLogLength(conf);
 
@@ -249,18 +241,8 @@
       }
     } finally {
       try{
-        URI[] archives = DistributedCache.getCacheArchives(conf);
-        URI[] files = DistributedCache.getCacheFiles(conf);
-        if (archives != null){
-          for (int i = 0; i < archives.length; i++){
-            DistributedCache.releaseCache(archives[i], conf);
-          }
-        }
-        if (files != null){
-          for(int i = 0; i < files.length; i++){
-            DistributedCache.releaseCache(files[i], conf);
-          }
-        }
+        taskDistributedCacheManager.release();
+
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
@@ -470,7 +452,7 @@
   /**
    */
   private static List<String> getClassPaths(JobConf conf, File workDir,
-      URI[] archives, URI[] files)
+      TaskDistributedCacheManager taskDistributedCacheManager)
       throws IOException {
     // Accumulates class paths for child.
     List<String> classPaths = new ArrayList<String>();
@@ -481,7 +463,7 @@
     appendJobJarClasspaths(conf.getJar(), classPaths);
     
     // Distributed cache paths
-    appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+    classPaths.addAll(taskDistributedCacheManager.getClassPaths());
     
     // Include the working dir too
     classPaths.add(workDir.toString());
@@ -600,105 +582,6 @@
     return new File(workDir.toString());
   }
 
-  private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
-      URI[] archives, URI[] files) throws IOException {
-    FileStatus fileStatus;
-    FileSystem fileSystem;
-    Path localPath;
-    String baseDir;
-    if ((archives != null) || (files != null)) {
-      if (archives != null) {
-        String[] archivesTimestamps = 
-                             DistributedCache.getArchiveTimestamps(conf);
-        Path[] p = new Path[archives.length];
-        for (int i = 0; i < archives.length;i++){
-          fileSystem = FileSystem.get(archives[i], conf);
-          fileStatus = fileSystem.getFileStatus(
-                                    new Path(archives[i].getPath()));
-          String cacheId = DistributedCache.makeRelative(archives[i],conf);
-          String cachePath = TaskTracker.getDistributedCacheDir() + 
-                               Path.SEPARATOR + cacheId;
-          
-          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                    fileStatus.getLen(), conf);
-          baseDir = localPath.toString().replace(cacheId, "");
-          p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                                                new Path(baseDir),
-                                                fileStatus,
-                                                true, Long.parseLong(
-                                                      archivesTimestamps[i]),
-                                                new Path(workDir.
-                                                      getAbsolutePath()), 
-                                                false);
-          
-        }
-        DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
-      }
-      if ((files != null)) {
-        String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
-        Path[] p = new Path[files.length];
-        for (int i = 0; i < files.length;i++){
-          fileSystem = FileSystem.get(files[i], conf);
-          fileStatus = fileSystem.getFileStatus(
-                                    new Path(files[i].getPath()));
-          String cacheId = DistributedCache.makeRelative(files[i], conf);
-          String cachePath = TaskTracker.getDistributedCacheDir() +
-                               Path.SEPARATOR + cacheId;
-          
-          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                    fileStatus.getLen(), conf);
-          baseDir = localPath.toString().replace(cacheId, "");
-          p[i] = DistributedCache.getLocalCache(files[i], conf, 
-                                                new Path(baseDir),
-                                                fileStatus,
-                                                false, Long.parseLong(
-                                                         fileTimestamps[i]),
-                                                new Path(workDir.
-                                                      getAbsolutePath()), 
-                                                false);
-        }
-        DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
-      }
-    }
-  }
-
-  private static void appendDistributedCacheClasspaths(JobConf conf,
-      URI[] archives, URI[] files, List<String> classPaths)
-      throws IOException {
-    // Archive paths
-    Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
-    if (archiveClasspaths != null && archives != null) {
-      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
-      if (localArchives != null){
-        for (int i=0;i<archives.length;i++){
-          for(int j=0;j<archiveClasspaths.length;j++){
-            if (archives[i].getPath().equals(
-                                             archiveClasspaths[j].toString())){
-              classPaths.add(localArchives[i].toString());
-            }
-          }
-        }
-      }
-    }
-    
-    //file paths
-    Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
-    if (fileClasspaths!=null && files != null) {
-      Path[] localFiles = DistributedCache
-        .getLocalCacheFiles(conf);
-      if (localFiles != null) {
-        for (int i = 0; i < files.length; i++) {
-          for (int j = 0; j < fileClasspaths.length; j++) {
-            if (files[i].getPath().equals(
-                                          fileClasspaths[j].toString())) {
-              classPaths.add(localFiles[i].toString());
-            }
-          }
-        }
-      }
-    }
-  }
-
   private static void appendSystemClasspaths(List<String> classPaths) {
     for (String c : System.getProperty("java.class.path").split(
         SYSTEM_PATH_SEPARATOR)) {
@@ -732,12 +615,22 @@
     classPaths.add(jobCacheDir.toString());
   }
   
-  //Mostly for setting up the symlinks. Note that when we setup the distributed
-  //cache, we didn't create the symlinks. This is done on a per task basis
-  //by the currently executing task.
-  public static void setupWorkDir(JobConf conf) throws IOException {
-    File workDir = new File(".").getAbsoluteFile();
+  /**
+   * Creates distributed cache symlinks and tmp directory, as appropriate.
+   * Note that when we setup the distributed
+   * cache, we didn't create the symlinks. This is done on a per task basis
+   * by the currently executing task.
+   * 
+   * @param conf The job configuration.
+   * @param workDir Working directory, which is completely deleted.
+   */
+  public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
+    LOG.debug("Fully deleting and re-creating" + workDir);
     FileUtil.fullyDelete(workDir);
+    if (!workDir.mkdir()) {
+      LOG.debug("Did not recreate " + workDir);
+    }
+    
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
@@ -746,48 +639,58 @@
       if (archives != null) {
         for (int i = 0; i < archives.length; i++) {
           String link = archives[i].getFragment();
-          if (link != null) {
-            link = workDir.toString() + Path.SEPARATOR + link;
-            File flink = new File(link);
-            if (!flink.exists()) {
-              FileUtil.symLink(localArchives[i].toString(), link);
-            }
-          }
+          String target = localArchives[i].toString();
+          symlink(workDir, target, link);
         }
       }
       if (files != null) {
         for (int i = 0; i < files.length; i++) {
           String link = files[i].getFragment();
-          if (link != null) {
-            link = workDir.toString() + Path.SEPARATOR + link;
-            File flink = new File(link);
-            if (!flink.exists()) {
-              FileUtil.symLink(localFiles[i].toString(), link);
-            }
-          }
+          String target = localFiles[i].toString();
+          symlink(workDir, target, link);
         }
       }
     }
-    File jobCacheDir = null;
+    
+    // For streaming, create extra symlinks (for all the files
+    // in the job cache dir) in the current working directory.
+    // Note that this is only executed if the configuration 
+    // points to a jar file.
     if (conf.getJar() != null) {
-      jobCacheDir = new File(
+      File jobCacheDir = new File(
           new Path(conf.getJar()).getParent().toString());
-    }
-
-    // create symlinks for all the files in job cache dir in current
-    // workingdir for streaming
-    try{
-      DistributedCache.createAllSymlink(conf, jobCacheDir,
-          workDir);
-    } catch(IOException ie){
-      // Do not exit even if symlinks have not been created.
-      LOG.warn(StringUtils.stringifyException(ie));
+      try{
+        TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir,
+            workDir);
+      } catch(IOException ie){
+        // Do not exit even if symlinks have not been created.
+        LOG.warn(StringUtils.stringifyException(ie));
+      }
     }
 
     createChildTmpDir(workDir, conf);
   }
 
   /**
+   * Utility method for creating a symlink and warning on errors.
+   * 
+   * If link is null, does nothing.
+   */
+  private static void symlink(File workDir, String target, String link)
+      throws IOException {
+    if (link != null) {
+      link = workDir.toString() + Path.SEPARATOR + link;
+      File flink = new File(link);
+      if (!flink.exists()) {
+        LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+        if (0 != FileUtil.symLink(target, link)) {
+          LOG.warn(String.format("Failed to create symlink: %s <- %s", target, link));
+        }
+      }
+    }
+  }
+
+  /**
    * Kill the child process
    */
   public void kill() {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Aug 25 10:27:53 2009
@@ -51,7 +51,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -70,6 +69,7 @@
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -147,6 +147,8 @@
 
   Server taskReportServer = null;
   InterTrackerProtocol jobClient;
+  
+  private TrackerDistributedCacheManager distributedCacheManager;
     
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
@@ -567,8 +569,11 @@
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    // Clear out temporary files that might be lying around
-    DistributedCache.purgeCache(this.fConf);
+    // Initialize DistributedCache and
+    // clear out temporary files that might be lying around
+    this.distributedCacheManager = 
+        new TrackerDistributedCacheManager(this.fConf);
+    this.distributedCacheManager.purgeCache();
     cleanupStorage();
 
     this.jobClient = (InterTrackerProtocol) 
@@ -3511,4 +3516,8 @@
     healthChecker = new NodeHealthCheckerService(conf);
     healthChecker.start();
   }
+
+  TrackerDistributedCacheManager getTrackerDistributedCacheManager() {
+    return distributedCacheManager;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Aug 25 10:27:53 2009
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.mapreduce.filecache;
 
-import org.apache.commons.logging.*;
 import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 
 import java.net.URI;
 
@@ -35,7 +35,7 @@
  * </p>
  * 
  * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
- * via the org.apache.hadoop.mapred.JobConf. The
+ * via the {@link org.apache.hadoop.mapred.JobConf}. The
  * <code>DistributedCache</code> assumes that the files specified via urls are
  * already present on the {@link FileSystem} at the path specified by the url
  * and are accessible by every machine in the cluster.</p>
@@ -82,8 +82,8 @@
  *     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
  *     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
  *     
- *     3. Use the cached files in the org.apache.hadoop.mapred.Mapper
- *     or org.apache.hadoop.mapred.Reducer:
+ *     3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper}
+ *     or {@link org.apache.hadoop.mapred.Reducer}:
  *     
  *     public static class MapClass extends MapReduceBase  
  *     implements Mapper&lt;K, V, K, V&gt; {
@@ -108,20 +108,24 @@
  *     }
  *     
  * </pre></blockquote></p>
- * 
+ *
+ * It is also very common to use the DistributedCache by using
+ * {@link org.apache.hadoop.util.GenericOptionsParser}.
+ *
+ * This class includes methods that should be used by users
+ * (specifically those mentioned in the example above, as well
+ * as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}),
+ * as well as methods intended for use by the MapReduce framework
+ * (e.g., {@link org.apache.hadoop.mapred.JobClient}).  For implementation
+ * details, see {@link TrackerDistributedCacheManager} and 
+ * {@link TaskDistributedCacheManager}.
+ *
+ * @see TrackerDistributedCacheManager
+ * @see TaskDistributedCacheManager
+ * @see org.apache.hadoop.mapred.JobConf
+ * @see org.apache.hadoop.mapred.JobClient
  */
 public class DistributedCache {
-  // cacheID to cacheStatus mapping
-  private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
-  
-  private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
-  
-  // default total cache size
-  private static final long DEFAULT_CACHE_SIZE = 10737418240L;
-
-  private static final Log LOG =
-    LogFactory.getLog(DistributedCache.class);
-  
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -144,15 +148,18 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, FileStatus fileStatus,
                                    boolean isArchive, long confFileStamp,
                                    Path currentWorkDir) 
-  throws IOException {
+      throws IOException {
     return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
         confFileStamp, currentWorkDir, true);
   }
+
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -178,48 +185,19 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
       Path baseDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf) 
-  throws IOException {
-    String cacheId = makeRelative(cache, conf);
-    CacheStatus lcacheStatus;
-    Path localizedPath;
-    synchronized (cachedArchives) {
-      lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null) {
-        // was never localized
-        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
-        cachedArchives.put(cacheId, lcacheStatus);
-      }
+      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
-      synchronized (lcacheStatus) {
-        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
-            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
-        lcacheStatus.refcount++;
-      }
-    }
-
-    // try deleting stuff if you can
-    long size = 0;
-    synchronized (baseDirSize) {
-      Long get = baseDirSize.get(baseDir);
-      if ( get != null ) {
-    	size = get.longValue();
-      }
-    }
-    // setting the cache size to a default of 10GB
-    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
-    if (allowedSize < size) {
-      // try some cache deletions
-      deleteCache(conf);
-    }
-    return localizedPath;
+    return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
+        baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+        honorSymLinkConf);
   }
 
-  
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -241,17 +219,18 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, boolean isArchive,
                                    long confFileStamp, Path currentWorkDir) 
-  throws IOException {
+      throws IOException {
     return getLocalCache(cache, conf, 
                          baseDir, null, isArchive,
                          confFileStamp, currentWorkDir);
   }
-  
+
   /**
    * This is the opposite of getlocalcache. When you are done with
    * using the cache, you need to release the cache
@@ -259,46 +238,14 @@
    * @param conf configuration which contains the filesystem the cache 
    * is contained in.
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void releaseCache(URI cache, Configuration conf)
-    throws IOException {
-    String cacheId = makeRelative(cache, conf);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null)
-        return;
-      synchronized (lcacheStatus) {
-        lcacheStatus.refcount--;
-      }
-    }
+      throws IOException {
+    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
   }
   
-  // To delete the caches which have a refcount of zero
-  
-  private static void deleteCache(Configuration conf) throws IOException {
-    // try deleting cache Status with refcount of zero
-    synchronized (cachedArchives) {
-      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
-        String cacheId = (String) it.next();
-        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        synchronized (lcacheStatus) {
-          if (lcacheStatus.refcount == 0) {
-            // delete this cache entry
-            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-            synchronized (baseDirSize) {
-              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
-              if ( dirSize != null ) {
-            	dirSize -= lcacheStatus.size;
-            	baseDirSize.put(lcacheStatus.baseDir, dirSize);
-              }
-            }
-            it.remove();
-          }
-        }
-      }
-    }
-  }
-
   /*
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
@@ -306,189 +253,17 @@
    * hostname/absolute path -- if it is just /absolute_path -- then the
    * relative path is hostname of DFS this mapred cluster is running
    * on/absolute_path
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static String makeRelative(URI cache, Configuration conf)
-    throws IOException {
-    String host = cache.getHost();
-    if (host == null) {
-      host = cache.getScheme();
-    }
-    if (host == null) {
-      URI defaultUri = FileSystem.get(conf).getUri();
-      host = defaultUri.getHost();
-      if (host == null) {
-        host = defaultUri.getScheme();
-      }
-    }
-    String path = host + cache.getPath();
-    path = path.replace(":/","/");                // remove windows device colon
-    return path;
-  }
-
-  private static Path cacheFilePath(Path p) {
-    return new Path(p, p.getName());
-  }
-
-  // the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  private static Path localizeCache(Configuration conf, 
-                                    URI cache, long confFileStamp,
-                                    CacheStatus cacheStatus,
-                                    FileStatus fileStatus,
-                                    boolean isArchive, 
-                                    Path currentWorkDir,boolean honorSymLinkConf) 
-  throws IOException {
-    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
-    if(cache.getFragment() == null) {
-    	doSymlink = false;
-    }
-    FileSystem fs = FileSystem.get(cache, conf);
-    String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
-    File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
-                           cacheStatus, fileStatus)) {
-      if (isArchive) {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                             link);
-        }
-        return cacheStatus.localLoadPath;
-      }
-      else {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                             link);
-        }
-        return cacheFilePath(cacheStatus.localLoadPath);
-      }
-    } else {
-      // remove the old archive
-      // if the old archive cannot be removed since it is being used by another
-      // job
-      // return null
-      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
-        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
-                              + " is in use and cannot be refreshed");
-      
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(cacheStatus.localLoadPath, true);
-      synchronized (baseDirSize) {
-    	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-    	if ( dirSize != null ) {
-    	  dirSize -= cacheStatus.size;
-    	  baseDirSize.put(cacheStatus.baseDir, dirSize);
-    	}
-      }
-      Path parchive = new Path(cacheStatus.localLoadPath,
-                               new Path(cacheStatus.localLoadPath.getName()));
-      
-      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
-        throw new IOException("Mkdirs failed to create directory " + 
-                              cacheStatus.localLoadPath.toString());
-      }
-
-      String cacheId = cache.getPath();
-      fs.copyToLocalFile(new Path(cacheId), parchive);
-      if (isArchive) {
-        String tmpArchive = parchive.toString().toLowerCase();
-        File srcFile = new File(parchive.toString());
-        File destDir = new File(parchive.getParent().toString());
-        if (tmpArchive.endsWith(".jar")) {
-          RunJar.unJar(srcFile, destDir);
-        } else if (tmpArchive.endsWith(".zip")) {
-          FileUtil.unZip(srcFile, destDir);
-        } else if (isTarFile(tmpArchive)) {
-          FileUtil.unTar(srcFile, destDir);
-        }
-        // else will not do anyhting
-        // and copy the file into the dir as it is
-      }
-      
-      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
-      cacheStatus.size = cacheSize;
-      synchronized (baseDirSize) {
-      	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-      	if( dirSize == null ) {
-      	  dirSize = Long.valueOf(cacheSize);
-      	} else {
-      	  dirSize += cacheSize;
-      	}
-      	baseDirSize.put(cacheStatus.baseDir, dirSize);
-      }
-      
-      // do chmod here 
-      try {
-        //Setting recursive permission to grant everyone read and execute
-        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-      } catch(InterruptedException e) {
-    	LOG.warn("Exception in chmod" + e.toString());
-      }
-
-      // update cacheStatus to reflect the newly cached file
-      cacheStatus.currentStatus = true;
-      cacheStatus.mtime = getTimestamp(conf, cache);
-    }
-    
-    if (isArchive){
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                           link);
-      }
-      return cacheStatus.localLoadPath;
-    }
-    else {
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                           link);
-      }
-      return cacheFilePath(cacheStatus.localLoadPath);
-    }
-  }
-
-  private static boolean isTarFile(String filename) {
-    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
-           filename.endsWith(".tar"));
-  }
-  
-  // Checks if the cache has already been localized and is fresh
-  private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
-                                          URI cache, long confFileStamp, 
-                                          CacheStatus lcacheStatus,
-                                          FileStatus fileStatus) 
-  throws IOException {
-    // check for existence of the cache
-    if (lcacheStatus.currentStatus == false) {
-      return false;
-    } else {
-      long dfsFileStamp;
-      if (fileStatus != null) {
-        dfsFileStamp = fileStatus.getModificationTime();
-      } else {
-        dfsFileStamp = getTimestamp(conf, cache);
-      }
-
-      // ensure that the file on hdfs hasn't been modified since the job started 
-      if (dfsFileStamp != confFileStamp) {
-        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
-        throw new IOException("File: " + cache + 
-                              " has changed on HDFS since job started");
-      }
-      
-      if (dfsFileStamp != lcacheStatus.mtime) {
-        // needs refreshing
-        return false;
-      }
-    }
-    
-    return true;
+      throws IOException {
+    return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
   }
 
   /**
    * Returns mtime of a given cache file on hdfs.
+   * 
    * @param conf configuration
    * @param cache cache file 
    * @return mtime of a given cache file on hdfs
@@ -501,32 +276,24 @@
 
     return fileSystem.getFileStatus(filePath).getModificationTime();
   }
-  
+
   /**
    * This method create symlinks for all files in a given dir in another directory
    * @param conf the configuration
    * @param jobCacheDir the target directory for creating symlinks
    * @param workDir the directory in which the symlinks are created
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
-    throws IOException{
-    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
-           workDir == null || (!workDir.isDirectory())) {
-      return;
-    }
-    boolean createSymlink = getSymlink(conf);
-    if (createSymlink){
-      File[] list = jobCacheDir.listFiles();
-      for (int i=0; i < list.length; i++){
-        FileUtil.symLink(list[i].getAbsolutePath(),
-                         new File(workDir, list[i].getName()).toString());
-      }
-    }  
+      throws IOException{
+    TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
   }
   
   /**
-   * Set the configuration with the given set of archives
+   * Set the configuration with the given set of archives.  Intended
+   * to be used by user code.
    * @param archives The list of archives that need to be localized
    * @param conf Configuration which will be changed
    */
@@ -536,7 +303,8 @@
   }
 
   /**
-   * Set the configuration with the given set of files
+   * Set the configuration with the given set of files.  Intended to be
+   * used by user code.
    * @param files The list of files that need to be localized
    * @param conf Configuration which will be changed
    */
@@ -546,7 +314,8 @@
   }
 
   /**
-   * Get cache archives set in the Configuration
+   * Get cache archives set in the Configuration.  Used by
+   * internal DistributedCache and MapReduce code.
    * @param conf The configuration which contains the archives
    * @return A URI array of the caches set in the Configuration
    * @throws IOException
@@ -556,18 +325,19 @@
   }
 
   /**
-   * Get cache files set in the Configuration
+   * Get cache files set in the Configuration.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which contains the files
    * @return A URI array of the files set in the Configuration
    * @throws IOException
    */
-
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
     return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
   }
 
   /**
-   * Return the path array of the localized caches
+   * Return the path array of the localized caches.  Intended to be used
+   * by user code.
    * @param conf Configuration that contains the localized archives
    * @return A path array of localized caches
    * @throws IOException
@@ -579,7 +349,8 @@
   }
 
   /**
-   * Return the path array of the localized files
+   * Return the path array of the localized files.  Intended to be used
+   * by user code.
    * @param conf Configuration that contains the localized files
    * @return A path array of localized files
    * @throws IOException
@@ -590,7 +361,8 @@
   }
 
   /**
-   * Get the timestamps of the archives
+   * Get the timestamps of the archives.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
@@ -601,7 +373,8 @@
 
 
   /**
-   * Get the timestamps of the files
+   * Get the timestamps of the files.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
@@ -611,7 +384,8 @@
   }
 
   /**
-   * This is to check the timestamp of the archives to be localized
+   * This is to check the timestamp of the archives to be localized.
+   * Used by internal MapReduce code.
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of archives.
    * The order should be the same as the order in which the archives are added.
@@ -621,7 +395,8 @@
   }
 
   /**
-   * This is to check the timestamp of the files to be localized
+   * This is to check the timestamp of the files to be localized.
+   * Used by internal MapReduce code.
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of files.
    * The order should be the same as the order in which the files are added.
@@ -631,7 +406,8 @@
   }
   
   /**
-   * Set the conf to contain the location for localized archives 
+   * Set the conf to contain the location for localized archives.  Used
+   * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local archives
    */
@@ -640,7 +416,8 @@
   }
 
   /**
-   * Set the conf to contain the location for localized files 
+   * Set the conf to contain the location for localized files.  Used
+   * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local files
    */
@@ -649,7 +426,8 @@
   }
 
   /**
-   * Add a archives to be localized to the conf
+   * Add a archives to be localized to the conf.  Intended to
+   * be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
    */
@@ -660,7 +438,8 @@
   }
   
   /**
-   * Add a file to be localized to the conf
+   * Add a file to be localized to the conf.  Intended
+   * to be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
    */
@@ -672,7 +451,7 @@
 
   /**
    * Add an file path to the current set of classpath entries It adds the file
-   * to cache as well.
+   * to cache as well.  Intended to be used by user code.
    * 
    * @param file Path of the file to be added
    * @param conf Configuration that contains the classpath setting
@@ -689,7 +468,8 @@
   }
 
   /**
-   * Get the file entries in classpath as an array of Path
+   * Get the file entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
    */
@@ -708,7 +488,7 @@
 
   /**
    * Add an archive path to the current set of classpath entries. It adds the
-   * archive to cache as well.
+   * archive to cache as well.  Intended to be used by user code.
    * 
    * @param archive Path of the archive to be added
    * @param conf Configuration that contains the classpath setting
@@ -725,7 +505,8 @@
   }
 
   /**
-   * Get the archive entries in classpath as an array of Path
+   * Get the archive entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
    */
@@ -744,7 +525,8 @@
 
   /**
    * This method allows you to create symlinks in the current working directory
-   * of the task to all the cache files/archives
+   * of the task to all the cache files/archives.
+   * Intended to be used by user code.
    * @param conf the jobconf 
    */
   public static void createSymlink(Configuration conf){
@@ -754,6 +536,7 @@
   /**
    * This method checks to see if symlinks are to be create for the 
    * localized cache files in the current working directory 
+   * Used by internal DistributedCache code.
    * @param conf the jobconf
    * @return true if symlinks are to be created- else return false
    */
@@ -769,7 +552,7 @@
    * This method checks if there is a conflict in the fragment names 
    * of the uris. Also makes sure that each uri has a fragment. It 
    * is only to be called if you want to create symlinks for 
-   * the various archives and files.
+   * the various archives and files.  May be used by user code.
    * @param uriFiles The uri array of urifiles
    * @param uriArchives the uri array of uri archives
    */
@@ -811,52 +594,14 @@
     return true;
   }
 
-  private static class CacheStatus {
-    // false, not loaded yet, true is loaded
-    boolean currentStatus;
-
-    // the local load path of this cache
-    Path localLoadPath;
-    
-    //the base dir where the cache lies
-    Path baseDir;
-    
-    //the size of this cache
-    long size;
-
-    // number of instances using this cache
-    int refcount;
-
-    // the cache-file modification time
-    long mtime;
-
-    public CacheStatus(Path baseDir, Path localLoadPath) {
-      super();
-      this.currentStatus = false;
-      this.localLoadPath = localLoadPath;
-      this.refcount = 0;
-      this.mtime = -1;
-      this.baseDir = baseDir;
-      this.size = 0;
-    }
-  }
-
   /**
    * Clear the entire contents of the cache and delete the backing files. This
    * should only be used when the server is reinitializing, because the users
    * are going to lose their files.
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void purgeCache(Configuration conf) throws IOException {
-    synchronized (cachedArchives) {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
-        try {
-          localFs.delete(f.getValue().localLoadPath, true);
-        } catch (IOException ie) {
-          LOG.debug("Error cleaning up cache", ie);
-        }
-      }
-      cachedArchives.clear();
-    }
+    new TrackerDistributedCacheManager(conf).purgeCache();
   }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=807543&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Aug 25 10:27:53 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.filecache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Helper class of {@link TrackerDistributedCacheManager} that represents
+ * the cached files of a single task.  This class is used
+ * by TaskRunner/LocalJobRunner to parse out the job configuration
+ * and setup the local caches.
+ * 
+ * <b>This class is internal to Hadoop, and should not be treated as a public
+ * interface.</b>
+ */
+public class TaskDistributedCacheManager {
+  private final TrackerDistributedCacheManager distributedCacheManager;
+  private final Configuration taskConf;
+  private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
+  private final List<String> classPaths = new ArrayList<String>();
+ 
+  private boolean setupCalled = false;
+
+  /**
+   * Struct representing a single cached file.
+   * There are four permutations (archive, file) and
+   * (don't put in classpath, do put in classpath).
+   */
+  static class CacheFile {
+    /** URI as in the configuration */
+    final URI uri;
+    enum FileType {
+      REGULAR,
+      ARCHIVE
+    }
+    /** Whether to decompress */
+    final FileType type;
+    final long timestamp;
+    /** Whether this is to be added to the classpath */
+    final boolean shouldBeAddedToClassPath;
+
+    private CacheFile(URI uri, FileType type, long timestamp, 
+        boolean classPath) {
+      this.uri = uri;
+      this.type = type;
+      this.timestamp = timestamp;
+      this.shouldBeAddedToClassPath = classPath;
+    }
+
+    /**
+     * Converts the scheme used by DistributedCache to serialize what files to
+     * cache in the configuration into CacheFile objects that represent those 
+     * files.
+     */
+    private static List<CacheFile> makeCacheFiles(URI[] uris, 
+        String[] timestamps, Path[] paths, FileType type) {
+      List<CacheFile> ret = new ArrayList<CacheFile>();
+      if (uris != null) {
+        if (uris.length != timestamps.length) {
+          throw new IllegalArgumentException("Mismatched uris and timestamps.");
+        }
+        Map<String, Path> classPaths = new HashMap<String, Path>();
+        if (paths != null) {
+          for (Path p : paths) {
+            classPaths.put(p.toString(), p);
+          }
+        }
+        for (int i = 0; i < uris.length; ++i) {
+          URI u = uris[i];
+          boolean isClassPath = (null != classPaths.get(u.getPath()));
+          long t = Long.parseLong(timestamps[i]);
+          ret.add(new CacheFile(u, type, t, isClassPath));
+        }
+      }
+      return ret;
+    }
+  }
+
+  TaskDistributedCacheManager(
+      TrackerDistributedCacheManager distributedCacheManager,
+      Configuration taskConf) throws IOException {
+    this.distributedCacheManager = distributedCacheManager;
+    this.taskConf = taskConf;
+    
+    this.cacheFiles.addAll(
+        CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
+            DistributedCache.getFileTimestamps(taskConf),
+            DistributedCache.getFileClassPaths(taskConf),
+            CacheFile.FileType.REGULAR));
+    this.cacheFiles.addAll(
+        CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
+          DistributedCache.getArchiveTimestamps(taskConf),
+          DistributedCache.getArchiveClassPaths(taskConf), 
+          CacheFile.FileType.ARCHIVE));
+  }
+
+  /**
+   * Retrieve files into the local cache and updates the task configuration 
+   * (which has been passed in via the constructor).
+   * 
+   * It is the caller's responsibility to re-write the task configuration XML
+   * file, if necessary.
+   */
+  public void setup(LocalDirAllocator lDirAlloc, File workDir, 
+      String cacheSubdir) throws IOException {
+    setupCalled = true;
+    
+    if (cacheFiles.isEmpty()) {
+      return;
+    }
+
+    ArrayList<Path> localArchives = new ArrayList<Path>();
+    ArrayList<Path> localFiles = new ArrayList<Path>();
+    Path workdirPath = new Path(workDir.getAbsolutePath());
+
+    for (CacheFile cacheFile : cacheFiles) {
+      URI uri = cacheFile.uri;
+      FileSystem fileSystem = FileSystem.get(uri, taskConf);
+      FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
+      String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
+      String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
+      Path localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+                                fileStatus.getLen(), taskConf);
+      String baseDir = localPath.toString().replace(cacheId, "");
+      Path p = distributedCacheManager.getLocalCache(uri, taskConf,
+          new Path(baseDir), fileStatus, 
+          cacheFile.type == CacheFile.FileType.ARCHIVE,
+          cacheFile.timestamp, workdirPath, false);
+
+      if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
+        localArchives.add(p);
+      } else {
+        localFiles.add(p);
+      }
+      if (cacheFile.shouldBeAddedToClassPath) {
+        classPaths.add(p.toString());
+      }
+    }
+
+    // Update the configuration object with localized data.
+    if (!localArchives.isEmpty()) {
+      DistributedCache.setLocalArchives(taskConf, 
+        stringifyPathList(localArchives));
+    }
+    if (!localFiles.isEmpty()) {
+      DistributedCache.setLocalFiles(taskConf, stringifyPathList(localFiles));
+    }
+
+  }
+
+  private static String stringifyPathList(List<Path> p){
+    if (p == null || p.isEmpty()) {
+      return null;
+    }
+    StringBuilder str = new StringBuilder(p.get(0).toString());
+    for (int i = 1; i < p.size(); i++){
+      str.append(",");
+      str.append(p.get(i).toString());
+    }
+    return str.toString();
+  }
+
+  /** 
+   * Retrieves class paths (as local references) to add. 
+   * Should be called after setup().
+   * 
+   */
+  public List<String> getClassPaths() throws IOException {
+    if (!setupCalled) {
+      throw new IllegalStateException(
+          "getClassPaths() should be called after setup()");
+    }
+    return classPaths;
+  }
+
+  /**
+   * Releases the cached files/archives, so that space
+   * can be reclaimed by the {@link TrackerDistributedCacheManager}.
+   */
+  public void release() throws IOException {
+    for (CacheFile c : cacheFiles) {
+      distributedCacheManager.releaseCache(c.uri, taskConf);
+    }
+  }
+
+  /**
+   * Creates a class loader that includes the designated
+   * files and archives.
+   */
+  public ClassLoader makeClassLoader(final ClassLoader parent) 
+      throws MalformedURLException {
+    final URL[] urls = new URL[classPaths.size()];
+    for (int i = 0; i < classPaths.size(); ++i) {
+      urls[i] = new File(classPaths.get(i)).toURI().toURL();
+    }
+    return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
+      @Override
+      public ClassLoader run() {
+        return new URLClassLoader(urls, parent);
+      }     
+    });
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=807543&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Aug 25 10:27:53 2009
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.filecache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * Manages a single machine's instance of a cross-job
+ * cache.  This class would typically be instantiated
+ * by a TaskTracker (or something that emulates it,
+ * like LocalJobRunner).
+ * 
+ * <b>This class is internal to Hadoop, and should not be treated as a public
+ * interface.</b>
+ */
+public class TrackerDistributedCacheManager {
+  // cacheID to cacheStatus mapping
+  private TreeMap<String, CacheStatus> cachedArchives = 
+    new TreeMap<String, CacheStatus>();
+
+  private TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+
+  // default total cache size (10GB)
+  private static final long DEFAULT_CACHE_SIZE = 10737418240L;
+
+  private static final Log LOG =
+    LogFactory.getLog(TrackerDistributedCacheManager.class);
+
+  private final LocalFileSystem localFs;
+
+  public TrackerDistributedCacheManager(Configuration conf) throws IOException {
+    this.localFs = FileSystem.getLocal(conf);
+  }
+
+  /**
+   * Get the locally cached file or archive; it could either be
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   *
+   * @param cache the cache to be localized, this should be specified as
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
+   * @param conf The Configuration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the 
+   *  files/archives
+   * @param fileStatus The file status on the dfs.
+   * @param isArchive if the cache is an archive or a file. In case it is an
+   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
+   *  be unzipped/unjarred/untarred automatically
+   *  and the directory where the archive is unzipped/unjarred/untarred is
+   *  returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify
+   * that the file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create
+   * symlinks for the locally cached files/archives
+   * @param honorSymLinkConf if this is false, then the symlinks are not
+   * created even if conf says so (this is required for an optimization in task
+   * launches
+   * NOTE: This is effectively always on since r696957, since there is no code
+   * path that does not use this.
+   * @return the path to directory where the archives are unjarred in case of
+   * archives, the path to the file where the file is copied locally
+   * @throws IOException
+   */
+  Path getLocalCache(URI cache, Configuration conf,
+      Path baseDir, FileStatus fileStatus,
+      boolean isArchive, long confFileStamp,
+      Path currentWorkDir, boolean honorSymLinkConf)
+      throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    CacheStatus lcacheStatus;
+    Path localizedPath;
+    synchronized (cachedArchives) {
+      lcacheStatus = cachedArchives.get(cacheId);
+      if (lcacheStatus == null) {
+        // was never localized
+        lcacheStatus = new CacheStatus(baseDir, 
+          new Path(baseDir, new Path(cacheId)));
+        cachedArchives.put(cacheId, lcacheStatus);
+      }
+
+      synchronized (lcacheStatus) {
+        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
+            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
+        lcacheStatus.refcount++;
+      }
+    }
+
+    // try deleting stuff if you can
+    long size = 0;
+    synchronized (baseDirSize) {
+      Long get = baseDirSize.get(baseDir);
+      if ( get != null ) {
+      size = get.longValue();
+      }
+    }
+    // setting the cache size to a default of 10GB
+    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+    if (allowedSize < size) {
+      // try some cache deletions
+      deleteCache(conf);
+    }
+    return localizedPath;
+  }
+
+  /**
+   * This is the opposite of getlocalcache. When you are done with
+   * using the cache, you need to release the cache
+   * @param cache The cache URI to be released
+   * @param conf configuration which contains the filesystem the cache
+   * is contained in.
+   * @throws IOException
+   */
+  void releaseCache(URI cache, Configuration conf)
+    throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    synchronized (cachedArchives) {
+      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+      if (lcacheStatus == null)
+        return;
+      synchronized (lcacheStatus) {
+        lcacheStatus.refcount--;
+      }
+    }
+  }
+
+  // To delete the caches which have a refcount of zero
+
+  private void deleteCache(Configuration conf) throws IOException {
+    // try deleting cache Status with refcount of zero
+    synchronized (cachedArchives) {
+      for (Iterator<String> it = cachedArchives.keySet().iterator(); 
+          it.hasNext();) {
+        String cacheId = it.next();
+        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+        synchronized (lcacheStatus) {
+          if (lcacheStatus.refcount == 0) {
+            // delete this cache entry
+            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+            synchronized (baseDirSize) {
+              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+              if ( dirSize != null ) {
+              dirSize -= lcacheStatus.size;
+              baseDirSize.put(lcacheStatus.baseDir, dirSize);
+              }
+            }
+            it.remove();
+          }
+        }
+      }
+    }
+  }
+
+  /*
+   * Returns the relative path of the dir this cache will be localized in
+   * relative path that this cache will be localized in. For
+   * hdfs://hostname:port/absolute_path -- the relative path is
+   * hostname/absolute path -- if it is just /absolute_path -- then the
+   * relative path is hostname of DFS this mapred cluster is running
+   * on/absolute_path
+   */
+  String makeRelative(URI cache, Configuration conf)
+    throws IOException {
+    String host = cache.getHost();
+    if (host == null) {
+      host = cache.getScheme();
+    }
+    if (host == null) {
+      URI defaultUri = FileSystem.get(conf).getUri();
+      host = defaultUri.getHost();
+      if (host == null) {
+        host = defaultUri.getScheme();
+      }
+    }
+    String path = host + cache.getPath();
+    path = path.replace(":/","/");                // remove windows device colon
+    return path;
+  }
+
+  private Path cacheFilePath(Path p) {
+    return new Path(p, p.getName());
+  }
+
+  // the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  private Path localizeCache(Configuration conf,
+                                    URI cache, long confFileStamp,
+                                    CacheStatus cacheStatus,
+                                    FileStatus fileStatus,
+                                    boolean isArchive,
+                                    Path currentWorkDir, 
+                                    boolean honorSymLinkConf)
+  throws IOException {
+    boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
+    if(cache.getFragment() == null) {
+      doSymlink = false;
+    }
+    FileSystem fs = FileSystem.get(cache, conf);
+    String link = 
+      currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
+    File flink = new File(link);
+    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                           cacheStatus, fileStatus)) {
+      LOG.info(String.format("Using existing cache of %s->%s",
+          cache.toString(), cacheStatus.localLoadPath));
+      if (isArchive) {
+        if (doSymlink){
+          if (!flink.exists())
+            FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+                             link);
+        }
+
+        return cacheStatus.localLoadPath;
+      }
+      else {
+        if (doSymlink){
+          if (!flink.exists())
+            FileUtil.symLink(
+              cacheFilePath(cacheStatus.localLoadPath).toString(), link);
+        }
+        return cacheFilePath(cacheStatus.localLoadPath);
+      }
+    } else {
+
+      // remove the old archive
+      // if the old archive cannot be removed since it is being used by another
+      // job
+      // return null
+      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+                              + " is in use and cannot be refreshed");
+
+      FileSystem localFs = FileSystem.getLocal(conf);
+      localFs.delete(cacheStatus.localLoadPath, true);
+      synchronized (baseDirSize) {
+      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      if ( dirSize != null ) {
+        dirSize -= cacheStatus.size;
+        baseDirSize.put(cacheStatus.baseDir, dirSize);
+      }
+      }
+      Path parchive = new Path(cacheStatus.localLoadPath,
+                               new Path(cacheStatus.localLoadPath.getName()));
+
+      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
+        throw new IOException("Mkdirs failed to create directory " +
+                              cacheStatus.localLoadPath.toString());
+      }
+
+      String cacheId = cache.getPath();
+      fs.copyToLocalFile(new Path(cacheId), parchive);
+      if (isArchive) {
+        String tmpArchive = parchive.toString().toLowerCase();
+        File srcFile = new File(parchive.toString());
+        File destDir = new File(parchive.getParent().toString());
+        LOG.info(String.format("Extracting %s to %s",
+            srcFile.toString(), destDir.toString()));
+        if (tmpArchive.endsWith(".jar")) {
+          RunJar.unJar(srcFile, destDir);
+        } else if (tmpArchive.endsWith(".zip")) {
+          FileUtil.unZip(srcFile, destDir);
+        } else if (isTarFile(tmpArchive)) {
+          FileUtil.unTar(srcFile, destDir);
+        } else {
+          LOG.warn(String.format(
+            "Cache file %s specified as archive, but not valid extension.", 
+            srcFile.toString()));
+          // else will not do anyhting
+          // and copy the file into the dir as it is
+        }
+      }
+
+      long cacheSize = 
+        FileUtil.getDU(new File(parchive.getParent().toString()));
+      cacheStatus.size = cacheSize;
+      synchronized (baseDirSize) {
+        Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+        if( dirSize == null ) {
+          dirSize = Long.valueOf(cacheSize);
+        } else {
+          dirSize += cacheSize;
+        }
+        baseDirSize.put(cacheStatus.baseDir, dirSize);
+      }
+
+      // do chmod here
+      try {
+        //Setting recursive permission to grant everyone read and execute
+        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+      } catch(InterruptedException e) {
+      LOG.warn("Exception in chmod" + e.toString());
+      }
+
+      // update cacheStatus to reflect the newly cached file
+      cacheStatus.currentStatus = true;
+      cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
+
+      LOG.info(String.format("Cached %s as %s",
+          cache.toString(), cacheStatus.localLoadPath));
+    }
+
+    if (isArchive){
+      if (doSymlink){
+        if (!flink.exists())
+          FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+                           link);
+      }
+      return cacheStatus.localLoadPath;
+    }
+    else {
+      if (doSymlink){
+        if (!flink.exists())
+          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
+                           link);
+      }
+      return cacheFilePath(cacheStatus.localLoadPath);
+    }
+  }
+
+  private static boolean isTarFile(String filename) {
+    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
+           filename.endsWith(".tar"));
+  }
+
+  // Checks if the cache has already been localized and is fresh
+  private boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
+                                          URI cache, long confFileStamp,
+                                          CacheStatus lcacheStatus,
+                                          FileStatus fileStatus)
+  throws IOException {
+    // check for existence of the cache
+    if (lcacheStatus.currentStatus == false) {
+      return false;
+    } else {
+      long dfsFileStamp;
+      if (fileStatus != null) {
+        dfsFileStamp = fileStatus.getModificationTime();
+      } else {
+        dfsFileStamp = DistributedCache.getTimestamp(conf, cache);
+      }
+
+      // ensure that the file on hdfs hasn't been modified since the job started
+      if (dfsFileStamp != confFileStamp) {
+        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+        throw new IOException("File: " + cache +
+                              " has changed on HDFS since job started");
+      }
+
+      if (dfsFileStamp != lcacheStatus.mtime) {
+        // needs refreshing
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * This method create symlinks for all files in a given dir in another 
+   * directory.
+   * 
+   * Should not be used outside of DistributedCache code.
+   * 
+   * @param conf the configuration
+   * @param jobCacheDir the target directory for creating symlinks
+   * @param workDir the directory in which the symlinks are created
+   * @throws IOException
+   */
+  public static void createAllSymlink(Configuration conf, File jobCacheDir, 
+      File workDir)
+    throws IOException{
+    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
+           workDir == null || (!workDir.isDirectory())) {
+      return;
+    }
+    boolean createSymlink = DistributedCache.getSymlink(conf);
+    if (createSymlink){
+      File[] list = jobCacheDir.listFiles();
+      for (int i=0; i < list.length; i++){
+        String target = list[i].getAbsolutePath();
+        String link = new File(workDir, list[i].getName()).toString();
+        LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+        int ret = FileUtil.symLink(target, link);
+        if (ret != 0) {
+          LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
+              link));
+        }
+      }
+    }
+  }
+
+  private static class CacheStatus {
+    // false, not loaded yet, true is loaded
+    boolean currentStatus;
+
+    // the local load path of this cache
+    Path localLoadPath;
+
+    //the base dir where the cache lies
+    Path baseDir;
+
+    //the size of this cache
+    long size;
+
+    // number of instances using this cache
+    int refcount;
+
+    // the cache-file modification time
+    long mtime;
+
+    public CacheStatus(Path baseDir, Path localLoadPath) {
+      super();
+      this.currentStatus = false;
+      this.localLoadPath = localLoadPath;
+      this.refcount = 0;
+      this.mtime = -1;
+      this.baseDir = baseDir;
+      this.size = 0;
+    }
+  }
+
+  /**
+   * Clear the entire contents of the cache and delete the backing files. This
+   * should only be used when the server is reinitializing, because the users
+   * are going to lose their files.
+   */
+  public void purgeCache() {
+    synchronized (cachedArchives) {
+      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
+        try {
+          localFs.delete(f.getValue().localLoadPath, true);
+        } catch (IOException ie) {
+          LOG.debug("Error cleaning up cache", ie);
+        }
+      }
+      cachedArchives.clear();
+    }
+  }
+
+  public TaskDistributedCacheManager newTaskDistributedCacheManager(
+      Configuration taskConf) throws IOException {
+    return new TaskDistributedCacheManager(this, taskConf);
+  }
+
+  /**
+   * Determines timestamps of files to be cached, and stores those
+   * in the configuration.  This is intended to be used internally by JobClient
+   * after all cache files have been added.
+   * 
+   * This is an internal method!
+   * 
+   * @param job Configuration of a job.
+   * @throws IOException
+   */
+  public static void determineTimestamps(Configuration job) throws IOException {
+    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    if (tarchives != null) {
+      StringBuffer archiveTimestamps = 
+        new StringBuffer(String.valueOf(
+            DistributedCache.getTimestamp(job, tarchives[0])));
+      for (int i = 1; i < tarchives.length; i++) {
+        archiveTimestamps.append(",");
+        archiveTimestamps.append(String.valueOf(
+            DistributedCache.getTimestamp(job, tarchives[i])));
+      }
+      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
+    }
+  
+    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    if (tfiles != null) {
+      StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
+          DistributedCache.getTimestamp(job, tfiles[0])));
+      for (int i = 1; i < tfiles.length; i++) {
+        fileTimestamps.append(",");
+        fileTimestamps.append(String.valueOf(
+            DistributedCache.getTimestamp(job, tfiles[i])));
+      }
+      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/commit-tests?rev=807543&r1=807542&r2=807543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/commit-tests (original)
+++ hadoop/mapreduce/trunk/src/test/commit-tests Tue Aug 25 10:27:53 2009
@@ -36,4 +36,6 @@
 **/TestTextOutputFormat.java
 **/TestTrackerBlacklistAcrossJobs.java
 **/TestTaskTrackerBlacklisting.java
+**/TestTaskTrackerLocalization
+**/TestTrackerDistributedCacheManager