You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2009/10/25 01:37:28 UTC

svn commit: r829468 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/filecache/

Author: acmurthy
Date: Sat Oct 24 23:37:28 2009
New Revision: 829468

URL: http://svn.apache.org/viewvc?rev=829468&view=rev
Log:
MAPREDUCE-1098. Fixed the distributed-cache to not do i/o while holding a global lock. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Oct 24 23:37:28 2009
@@ -43,6 +43,9 @@
     MAPREDUCE-1133. Eclipse .classpath template has outdated jar files and is
     missing some new ones. (cos)
 
+    MAPREDUCE-1098. Fixed the distributed-cache to not do i/o while holding a
+    global lock. (Amareshwari Sriramadasu via acmurthy)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Sat Oct 24 23:37:28 2009
@@ -199,7 +199,7 @@
       Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
     return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
-        baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+        baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
         honorSymLinkConf);
   }
 
@@ -250,7 +250,35 @@
   @Deprecated
   public static void releaseCache(URI cache, Configuration conf)
       throws IOException {
-    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
+	// find the timestamp of the uri
+    URI[] archives = DistributedCache.getCacheArchives(conf);
+    URI[] files = DistributedCache.getCacheFiles(conf);
+    String[] archivesTimestamps =
+          DistributedCache.getArchiveTimestamps(conf);
+    String[] filesTimestamps =
+          DistributedCache.getFileTimestamps(conf);
+    String timestamp = null;
+    if (archives != null) {
+      for (int i = 0; i < archives.length; i++) {
+        if (archives[i].equals(cache)) {
+          timestamp = archivesTimestamps[i];
+          break;
+        }
+      }
+    }
+    if (timestamp == null && files != null) {
+      for (int i = 0; i < files.length; i++) {
+        if (files[i].equals(cache)) {
+          timestamp = filesTimestamps[i];
+          break;
+        }
+      }
+    }
+    if (timestamp == null) {
+      throw new IOException("TimeStamp of the uri couldnot be found");
+    }
+    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
+          Long.parseLong(timestamp));
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Sat Oct 24 23:37:28 2009
@@ -151,23 +151,9 @@
       URI uri = cacheFile.uri;
       FileSystem fileSystem = FileSystem.get(uri, taskConf);
       FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-      String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
-      String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
 
-      // Get the local path if the cacheFile is already localized or create one
-      // if it doesn't
-      Path localPath;
-      try {
-        localPath = lDirAlloc.getLocalPathToRead(cachePath, taskConf);
-      } catch (DiskErrorException de) {
-        localPath =
-            lDirAlloc.getLocalPathForWrite(cachePath, fileStatus.getLen(),
-                taskConf);
-      }
-
-      String baseDir = localPath.toString().replace(cacheId, "");
       Path p = distributedCacheManager.getLocalCache(uri, taskConf,
-          new Path(baseDir), fileStatus, 
+          cacheSubdir, fileStatus, 
           cacheFile.type == CacheFile.FileType.ARCHIVE,
           cacheFile.timestamp, workdirPath, false);
 
@@ -224,7 +210,7 @@
    */
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
-      distributedCacheManager.releaseCache(c.uri, taskConf);
+      distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Sat Oct 24 23:37:28 2009
@@ -20,8 +20,11 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -32,6 +35,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.RunJar;
@@ -59,9 +63,17 @@
     LogFactory.getLog(TrackerDistributedCacheManager.class);
 
   private final LocalFileSystem localFs;
+  
+  private LocalDirAllocator lDirAllocator;
+  
+  private Configuration trackerConf;
+  
+  private Random random = new Random();
 
   public TrackerDistributedCacheManager(Configuration conf) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
+    this.trackerConf = conf;
+    this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
   }
 
   /**
@@ -71,7 +83,7 @@
    * @param cache the cache to be localized, this should be specified as
    * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Configuration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the 
+   * @param subDir The base cache subDir where you want to localize the 
    *  files/archives
    * @param fileStatus The file status on the dfs.
    * @param isArchive if the cache is an archive or a file. In case it is an
@@ -94,35 +106,55 @@
    * @throws IOException
    */
   Path getLocalCache(URI cache, Configuration conf,
-      Path baseDir, FileStatus fileStatus,
+      String subDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
       Path currentWorkDir, boolean honorSymLinkConf)
       throws IOException {
-    String cacheId = makeRelative(cache, conf);
+    String key = getKey(cache, conf, confFileStamp);
     CacheStatus lcacheStatus;
-    Path localizedPath;
+    Path localizedPath = null;
     synchronized (cachedArchives) {
-      lcacheStatus = cachedArchives.get(cacheId);
+      lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
         // was never localized
-        lcacheStatus = new CacheStatus(baseDir, 
-          new Path(baseDir, new Path(cacheId)));
-        cachedArchives.put(cacheId, lcacheStatus);
-      }
-
-      synchronized (lcacheStatus) {
+        String cachePath = new Path (subDir, 
+          new Path(String.valueOf(random.nextLong()),
+            makeRelative(cache, conf))).toString();
+        Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+          fileStatus.getLen(), trackerConf);
+        lcacheStatus = new CacheStatus(
+          new Path(localPath.toString().replace(cachePath, "")), localPath); 
+        cachedArchives.put(key, lcacheStatus);
+      }
+
+      //mark the cache for use. 
+      lcacheStatus.refcount++;
+    }
+    
+    // do the localization, after releasing the global lock
+    synchronized (lcacheStatus) {
+      if (!lcacheStatus.isInited()) {
         localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
-            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
-        lcacheStatus.refcount++;
+            fileStatus, isArchive);
+        lcacheStatus.initComplete();
+      } else {
+        localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+            lcacheStatus, fileStatus, isArchive);
       }
+      createSymlink(conf, cache, lcacheStatus, isArchive,
+          currentWorkDir, honorSymLinkConf);
     }
 
     // try deleting stuff if you can
     long size = 0;
-    synchronized (baseDirSize) {
-      Long get = baseDirSize.get(baseDir);
-      if ( get != null ) {
-      size = get.longValue();
+    synchronized (lcacheStatus) {
+      synchronized (baseDirSize) {
+        Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+        if ( get != null ) {
+         size = get.longValue();
+        } else {
+          LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
+        }
       }
     }
     // setting the cache size to a default of 10GB
@@ -142,40 +174,58 @@
    * is contained in.
    * @throws IOException
    */
-  void releaseCache(URI cache, Configuration conf)
+  void releaseCache(URI cache, Configuration conf, long timeStamp)
     throws IOException {
-    String cacheId = makeRelative(cache, conf);
+    String key = getKey(cache, conf, timeStamp);
     synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null)
+      CacheStatus lcacheStatus = cachedArchives.get(key);
+      if (lcacheStatus == null) {
+        LOG.warn("Cannot find localized cache: " + cache + 
+                 " (key: " + key + ") in releaseCache!");
         return;
-      synchronized (lcacheStatus) {
-        lcacheStatus.refcount--;
       }
+      
+      // decrement ref count 
+      lcacheStatus.refcount--;
     }
   }
 
   // To delete the caches which have a refcount of zero
 
   private void deleteCache(Configuration conf) throws IOException {
+    Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
     // try deleting cache Status with refcount of zero
     synchronized (cachedArchives) {
       for (Iterator<String> it = cachedArchives.keySet().iterator(); 
           it.hasNext();) {
         String cacheId = it.next();
         CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        synchronized (lcacheStatus) {
-          if (lcacheStatus.refcount == 0) {
-            // delete this cache entry
-            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-            synchronized (baseDirSize) {
-              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
-              if ( dirSize != null ) {
-              dirSize -= lcacheStatus.size;
-              baseDirSize.put(lcacheStatus.baseDir, dirSize);
-              }
-            }
-            it.remove();
+        
+        // if reference count is zero 
+        // mark the cache for deletion
+        if (lcacheStatus.refcount == 0) {
+          // delete this cache entry from the global list 
+          // and mark the localized file for deletion
+          deleteSet.add(lcacheStatus);
+          it.remove();
+        }
+      }
+    }
+    
+    // do the deletion, after releasing the global lock
+    for (CacheStatus lcacheStatus : deleteSet) {
+      synchronized (lcacheStatus) {
+        FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+        LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+        // decrement the size of the cache from baseDirSize
+        synchronized (baseDirSize) {
+          Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+          if ( dirSize != null ) {
+            dirSize -= lcacheStatus.size;
+            baseDirSize.put(lcacheStatus.baseDir, dirSize);
+          } else {
+            LOG.warn("Cannot find record of the baseDir: " + 
+                     lcacheStatus.baseDir + " during delete!");
           }
         }
       }
@@ -208,6 +258,11 @@
     return path;
   }
 
+  String getKey(URI cache, Configuration conf, long timeStamp) 
+      throws IOException {
+    return makeRelative(cache, conf) + String.valueOf(timeStamp);
+  }
+  
   /**
    * Returns mtime of a given cache file on hdfs.
    * 
@@ -224,144 +279,115 @@
     return fileSystem.getFileStatus(filePath).getModificationTime();
   }
 
-  private Path cacheFilePath(Path p) {
-    return new Path(p, p.getName());
-  }
+  private Path checkCacheStatusValidity(Configuration conf,
+      URI cache, long confFileStamp,
+      CacheStatus cacheStatus,
+      FileStatus fileStatus,
+      boolean isArchive
+      ) throws IOException {
+    FileSystem fs = FileSystem.get(cache, conf);
+    // Has to be 
+    if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                          cacheStatus, fileStatus)) {
+      throw new IOException("Stale cache file: " + cacheStatus.localLoadPath + 
+                            " for cache-file: " + cache);
+    }
 
-  // the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  private Path localizeCache(Configuration conf,
-                                    URI cache, long confFileStamp,
-                                    CacheStatus cacheStatus,
-                                    FileStatus fileStatus,
-                                    boolean isArchive,
-                                    Path currentWorkDir, 
-                                    boolean honorSymLinkConf)
-  throws IOException {
+    LOG.info(String.format("Using existing cache of %s->%s",
+        cache.toString(), cacheStatus.localLoadPath));
+    return cacheStatus.localLoadPath;
+  }
+  
+  private void createSymlink(Configuration conf, URI cache,
+      CacheStatus cacheStatus, boolean isArchive,
+      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
     boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
     if(cache.getFragment() == null) {
       doSymlink = false;
     }
-    FileSystem fs = FileSystem.get(cache, conf);
     String link = 
       currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
-                           cacheStatus, fileStatus)) {
-      LOG.info(String.format("Using existing cache of %s->%s",
-          cache.toString(), cacheStatus.localLoadPath));
-      if (isArchive) {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheStatus.localLoadPath.toString(),
-                             link);
-        }
-
-        return cacheStatus.localLoadPath;
-      }
-      else {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(
-              cacheFilePath(cacheStatus.localLoadPath).toString(), link);
-        }
-        return cacheFilePath(cacheStatus.localLoadPath);
+    if (doSymlink){
+      if (!flink.exists()) {
+        FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
       }
+    }
+  }
+  
+  // the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  private Path localizeCache(Configuration conf,
+                                    URI cache, long confFileStamp,
+                                    CacheStatus cacheStatus,
+                                    FileStatus fileStatus,
+                                    boolean isArchive)
+  throws IOException {
+    FileSystem fs = FileSystem.get(cache, conf);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path parchive = null;
+    if (isArchive) {
+      parchive = new Path(cacheStatus.localLoadPath,
+        new Path(cacheStatus.localLoadPath.getName()));
     } else {
+      parchive = cacheStatus.localLoadPath;
+    }
 
-      // remove the old archive
-      // if the old archive cannot be removed since it is being used by another
-      // job
-      // return null
-      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
-        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
-                              + " is in use and cannot be refreshed");
-
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(cacheStatus.localLoadPath, true);
-      synchronized (baseDirSize) {
-      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-      if ( dirSize != null ) {
-        dirSize -= cacheStatus.size;
-        baseDirSize.put(cacheStatus.baseDir, dirSize);
-      }
-      }
-      Path parchive = new Path(cacheStatus.localLoadPath,
-                               new Path(cacheStatus.localLoadPath.getName()));
-
-      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
-        throw new IOException("Mkdirs failed to create directory " +
-                              cacheStatus.localLoadPath.toString());
-      }
-
-      String cacheId = cache.getPath();
-      fs.copyToLocalFile(new Path(cacheId), parchive);
-      if (isArchive) {
-        String tmpArchive = parchive.toString().toLowerCase();
-        File srcFile = new File(parchive.toString());
-        File destDir = new File(parchive.getParent().toString());
-        LOG.info(String.format("Extracting %s to %s",
-            srcFile.toString(), destDir.toString()));
-        if (tmpArchive.endsWith(".jar")) {
-          RunJar.unJar(srcFile, destDir);
-        } else if (tmpArchive.endsWith(".zip")) {
-          FileUtil.unZip(srcFile, destDir);
-        } else if (isTarFile(tmpArchive)) {
-          FileUtil.unTar(srcFile, destDir);
-        } else {
-          LOG.warn(String.format(
+    if (!localFs.mkdirs(parchive.getParent())) {
+      throw new IOException("Mkdirs failed to create directory " +
+          cacheStatus.localLoadPath.toString());
+    }
+
+    String cacheId = cache.getPath();
+    fs.copyToLocalFile(new Path(cacheId), parchive);
+    if (isArchive) {
+      String tmpArchive = parchive.toString().toLowerCase();
+      File srcFile = new File(parchive.toString());
+      File destDir = new File(parchive.getParent().toString());
+      LOG.info(String.format("Extracting %s to %s",
+          srcFile.toString(), destDir.toString()));
+      if (tmpArchive.endsWith(".jar")) {
+        RunJar.unJar(srcFile, destDir);
+      } else if (tmpArchive.endsWith(".zip")) {
+        FileUtil.unZip(srcFile, destDir);
+      } else if (isTarFile(tmpArchive)) {
+        FileUtil.unTar(srcFile, destDir);
+      } else {
+        LOG.warn(String.format(
             "Cache file %s specified as archive, but not valid extension.", 
             srcFile.toString()));
-          // else will not do anyhting
-          // and copy the file into the dir as it is
-        }
+        // else will not do anyhting
+        // and copy the file into the dir as it is
       }
+    }
 
-      long cacheSize = 
-        FileUtil.getDU(new File(parchive.getParent().toString()));
-      cacheStatus.size = cacheSize;
-      synchronized (baseDirSize) {
-        Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-        if( dirSize == null ) {
-          dirSize = Long.valueOf(cacheSize);
-        } else {
-          dirSize += cacheSize;
-        }
-        baseDirSize.put(cacheStatus.baseDir, dirSize);
+    long cacheSize = 
+      FileUtil.getDU(new File(parchive.getParent().toString()));
+    cacheStatus.size = cacheSize;
+    synchronized (baseDirSize) {
+      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      if( dirSize == null ) {
+        dirSize = Long.valueOf(cacheSize);
+      } else {
+        dirSize += cacheSize;
       }
+      baseDirSize.put(cacheStatus.baseDir, dirSize);
+    }
 
-      // do chmod here
-      try {
-        //Setting recursive permission to grant everyone read and execute
-        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-      } catch(InterruptedException e) {
+    // do chmod here
+    try {
+      //Setting recursive permission to grant everyone read and execute
+      FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+    } catch(InterruptedException e) {
       LOG.warn("Exception in chmod" + e.toString());
-      }
-
-      // update cacheStatus to reflect the newly cached file
-      cacheStatus.currentStatus = true;
-      cacheStatus.mtime = getTimestamp(conf, cache);
-
-      LOG.info(String.format("Cached %s as %s",
-          cache.toString(), cacheStatus.localLoadPath));
     }
 
-    if (isArchive){
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheStatus.localLoadPath.toString(),
-                           link);
-      }
-      return cacheStatus.localLoadPath;
-    }
-    else {
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
-                           link);
-      }
-      return cacheFilePath(cacheStatus.localLoadPath);
-    }
+    // update cacheStatus to reflect the newly cached file
+    cacheStatus.mtime = getTimestamp(conf, cache);
+
+    LOG.info(String.format("Cached %s as %s",
+             cache.toString(), cacheStatus.localLoadPath));
+    return cacheStatus.localLoadPath;
   }
 
   private static boolean isTarFile(String filename) {
@@ -375,28 +401,22 @@
                                           CacheStatus lcacheStatus,
                                           FileStatus fileStatus)
   throws IOException {
-    // check for existence of the cache
-    if (lcacheStatus.currentStatus == false) {
-      return false;
+    long dfsFileStamp;
+    if (fileStatus != null) {
+      dfsFileStamp = fileStatus.getModificationTime();
     } else {
-      long dfsFileStamp;
-      if (fileStatus != null) {
-        dfsFileStamp = fileStatus.getModificationTime();
-      } else {
-        dfsFileStamp = getTimestamp(conf, cache);
-      }
+      dfsFileStamp = getTimestamp(conf, cache);
+    }
 
-      // ensure that the file on hdfs hasn't been modified since the job started
-      if (dfsFileStamp != confFileStamp) {
-        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
-        throw new IOException("File: " + cache +
-                              " has changed on HDFS since job started");
-      }
+    // ensure that the file on hdfs hasn't been modified since the job started
+    if (dfsFileStamp != confFileStamp) {
+      LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+      throw new IOException("File: " + cache +
+                            " has changed on HDFS since job started");
+    }
 
-      if (dfsFileStamp != lcacheStatus.mtime) {
-        // needs refreshing
-        return false;
-      }
+    if (dfsFileStamp != lcacheStatus.mtime) {
+      return false;
     }
 
     return true;
@@ -437,9 +457,6 @@
   }
 
   private static class CacheStatus {
-    // false, not loaded yet, true is loaded
-    boolean currentStatus;
-
     // the local load path of this cache
     Path localLoadPath;
 
@@ -455,15 +472,31 @@
     // the cache-file modification time
     long mtime;
 
+    // is it initialized ?
+    boolean inited = false;
+    
     public CacheStatus(Path baseDir, Path localLoadPath) {
       super();
-      this.currentStatus = false;
       this.localLoadPath = localLoadPath;
       this.refcount = 0;
       this.mtime = -1;
       this.baseDir = baseDir;
       this.size = 0;
     }
+    
+    Path getBaseDir(){
+      return this.baseDir;
+    }
+    
+    // mark it as initialized
+    void initComplete() {
+      inited = true;
+    }
+
+    // is it initialized?
+    boolean isInited() {
+      return inited;
+    }
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Sat Oct 24 23:37:28 2009
@@ -183,4 +183,12 @@
     }
     super.testFileSystemOtherThanDefault();
   }
+  
+  @Override
+  public void testFreshness()  throws Exception { 
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+    super.testFreshness();
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Sat Oct 24 23:37:28 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -53,7 +54,7 @@
           .getAbsolutePath();
 
   protected File ROOT_MAPRED_LOCAL_DIR;
-  private static String TEST_CACHE_BASE_DIR;
+  private static String TEST_CACHE_BASE_DIR = "cachebasedir";
   protected int numLocalDirs = 6;
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
@@ -61,6 +62,7 @@
   protected Configuration conf;
   protected Path firstCacheFile;
   protected Path secondCacheFile;
+  private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
     new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
@@ -77,18 +79,12 @@
     // Prepare the tests' mapred-local-dir
     ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
     ROOT_MAPRED_LOCAL_DIR.mkdirs();
-    String []localDirs = new String[numLocalDirs];
-    for (int i = 0; i < numLocalDirs; i++) {
-      localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
-    }
-
-    TEST_CACHE_BASE_DIR =
-        new File(TEST_ROOT_DIR, "cachebasedir").getAbsolutePath();
 
     conf = new Configuration();
     conf.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
-    conf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localDirs);
+    conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, ROOT_MAPRED_LOCAL_DIR.toString());
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+    fs = FileSystem.get(conf);
 
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
@@ -187,20 +183,21 @@
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf);
     FileSystem localfs = FileSystem.getLocal(conf);
+    long now = System.currentTimeMillis();
 
     manager.getLocalCache(firstCacheFile.toUri(), conf, 
-        new Path(TEST_CACHE_BASE_DIR), null, false, 
-        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
-    manager.releaseCache(firstCacheFile.toUri(), conf);
+        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false);
+    manager.releaseCache(firstCacheFile.toUri(), conf, now);
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
     //sweep away the first cache.
     manager.getLocalCache(secondCacheFile.toUri(), conf, 
-        new Path(TEST_CACHE_BASE_DIR), null, false, 
+        TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false, 
         System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
     FileStatus[] dirStatuses = localfs.listStatus(
-        new Path(TEST_CACHE_BASE_DIR));
+      new Path(ROOT_MAPRED_LOCAL_DIR.toString()));
     assertTrue("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
         dirStatuses.length == 1);
@@ -213,7 +210,8 @@
     Path fileToCache = new Path("fakefile:///"
         + firstCacheFile.toUri().getPath());
     Path result = manager.getLocalCache(fileToCache.toUri(), conf,
-        new Path(TEST_CACHE_BASE_DIR), null, false, System.currentTimeMillis(),
+        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+        System.currentTimeMillis(),
         new Path(TEST_ROOT_DIR), false);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
@@ -245,4 +243,98 @@
   protected File pathToFile(Path p) {
     return new File(p.toString());
   }
+  
+  public static class FakeFileSystem extends RawLocalFileSystem {
+    private long increment = 0;
+    public FakeFileSystem() {
+      super();
+    }
+    
+    public FileStatus getFileStatus(Path p) throws IOException {
+      File f = pathToFile(p);
+      return new FileStatus(f.length(), f.isDirectory(), 1, 128,
+      f.lastModified() + increment, makeQualified(new Path(f.getPath())));
+    }
+    
+    void advanceClock(long millis) {
+      increment += millis;
+    }
+  }
+  
+  public void testFreshness() throws Exception {
+    Configuration myConf = new Configuration(conf);
+    myConf.set("fs.default.name", "refresh:///");
+    myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
+    TrackerDistributedCacheManager manager = 
+      new TrackerDistributedCacheManager(myConf);
+    // ****** Imitate JobClient code
+    // Configures a task/job with both a regular file and a "classpath" file.
+    Configuration subConf = new Configuration(myConf);
+    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
+    TrackerDistributedCacheManager.determineTimestamps(subConf);
+    // ****** End of imitating JobClient code
+
+    String userName = getJobOwnerName();
+
+    // ****** Imitate TaskRunner code.
+    TaskDistributedCacheManager handle =
+      manager.newTaskDistributedCacheManager(subConf);
+    assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
+    File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
+    handle.setup(localDirAllocator, workDir, TaskTracker
+        .getDistributedCacheDir(userName));
+    // ****** End of imitating TaskRunner code
+
+    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
+    assertNotNull(null, localCacheFiles);
+    assertEquals(1, localCacheFiles.length);
+    Path cachedFirstFile = localCacheFiles[0];
+    assertFileLengthEquals(firstCacheFile, cachedFirstFile);
+    assertFalse("Paths should be different.", 
+        firstCacheFile.equals(cachedFirstFile));
+    // release
+    handle.release();
+    
+    // change the file timestamp
+    FileSystem fs = FileSystem.get(myConf);
+    ((FakeFileSystem)fs).advanceClock(1);
+
+    // running a task of the same job
+    Throwable th = null;
+    try {
+      handle.setup(localDirAllocator, workDir, TaskTracker
+          .getDistributedCacheDir(userName));
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull("Throwable is null", th);
+    assertTrue("Exception message does not match",
+        th.getMessage().contains("has changed on HDFS since job started"));
+    // release
+    handle.release();
+    
+    // submit another job
+    Configuration subConf2 = new Configuration(myConf);
+    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
+    TrackerDistributedCacheManager.determineTimestamps(subConf2);
+    
+    handle =
+      manager.newTaskDistributedCacheManager(subConf2);
+    handle.setup(localDirAllocator, workDir, TaskTracker
+        .getDistributedCacheDir(userName));
+    Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
+    assertNotNull(null, localCacheFiles2);
+    assertEquals(1, localCacheFiles2.length);
+    Path cachedFirstFile2 = localCacheFiles2[0];
+    assertFileLengthEquals(firstCacheFile, cachedFirstFile2);
+    assertFalse("Paths should be different.", 
+        firstCacheFile.equals(cachedFirstFile2));
+    
+    // assert that two localizations point to different paths
+    assertFalse("two jobs with different timestamps did not localize" +
+        " in different paths", cachedFirstFile.equals(cachedFirstFile2));
+    // release
+    handle.release();
+  }
+
 }