You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/06/17 12:41:08 UTC

svn commit: r955543 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Author: amareshwari
Date: Thu Jun 17 10:41:08 2010
New Revision: 955543

URL: http://svn.apache.org/viewvc?rev=955543&view=rev
Log:
MAPREDUCE-1225. Fixes DistributedCache to check if the file is fresh or not, for the first localization also. Contributed by Zhong Wang.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=955543&r1=955542&r2=955543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jun 17 10:41:08 2010
@@ -92,6 +92,9 @@ Trunk (unreleased changes)
 
     MAPREDUCE-1813. NPE in PipeMapred.MRErrorThread. (Ravi Gummadi via vinodkv)
 
+    MAPREDUCE-1225. Fixes DistributedCache to check if the file is fresh or not,
+    for the first localization also. (Zhong Wang via amareshwari)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=955543&r1=955542&r2=955543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Jun 17 10:41:08 2010
@@ -174,8 +174,11 @@ public class TrackerDistributedCacheMana
       // do the localization, after releasing the global lock
       synchronized (lcacheStatus) {
         if (!lcacheStatus.isInited()) {
+          FileSystem fs = FileSystem.get(cache, conf);
+          checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
+              lcacheStatus, fileStatus);
           localizedPath = localizeCache(conf, cache, confFileStamp,
-              lcacheStatus, fileStatus, isArchive, isPublic);
+              lcacheStatus, isArchive, isPublic);
           lcacheStatus.initComplete();
         } else {
           localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
@@ -404,7 +407,6 @@ public class TrackerDistributedCacheMana
   Path localizeCache(Configuration conf,
                                     URI cache, long confFileStamp,
                                     CacheStatus cacheStatus,
-                                    FileStatus fileStatus,
                                     boolean isArchive, boolean isPublic)
   throws IOException {
     FileSystem fs = FileSystem.get(cache, conf);
@@ -488,9 +490,9 @@ public class TrackerDistributedCacheMana
     return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
            filename.endsWith(".tar"));
   }
-
-  // Checks if the cache has already been localized and is fresh
-  private boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
+  
+  // ensure that the file on hdfs hasn't been modified since the job started
+  private long checkStampSinceJobStarted(Configuration conf, FileSystem fs,
                                           URI cache, long confFileStamp,
                                           CacheStatus lcacheStatus,
                                           FileStatus fileStatus)
@@ -502,13 +504,23 @@ public class TrackerDistributedCacheMana
       dfsFileStamp = getTimestamp(conf, cache);
     }
 
-    // ensure that the file on hdfs hasn't been modified since the job started
     if (dfsFileStamp != confFileStamp) {
       LOG.fatal("File: " + cache + " has changed on HDFS since job started");
       throw new IOException("File: " + cache +
                             " has changed on HDFS since job started");
     }
+    
+    return dfsFileStamp;
+  }
 
+  // Checks if the cache has already been localized and is fresh
+  private boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
+                                          URI cache, long confFileStamp,
+                                          CacheStatus lcacheStatus,
+                                          FileStatus fileStatus)
+  throws IOException {
+    long dfsFileStamp = checkStampSinceJobStarted(conf, fs, cache,
+        confFileStamp, lcacheStatus, fileStatus);
     if (dfsFileStamp != lcacheStatus.mtime) {
       return false;
     }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=955543&r1=955542&r2=955543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Jun 17 10:41:08 2010
@@ -202,13 +202,13 @@ public class TestTrackerDistributedCache
 
     @Override
     Path localizeCache(Configuration conf, URI cache, long confFileStamp,
-        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive,
-        boolean isPublic) throws IOException {
+        CacheStatus cacheStatus, boolean isArchive, boolean isPublic)
+    throws IOException {
       if (cache.equals(firstCacheFile.toUri())) {
         throw new IOException("fake fail");
       }
       return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
-          fileStatus, isArchive, isPublic);
+          isArchive, isPublic);
     }
   }
 
@@ -426,6 +426,12 @@ public class TestTrackerDistributedCache
   protected String getJobOwnerName() throws IOException {
     return UserGroupInformation.getLoginUser().getUserName();
   }
+  
+  private long getFileStamp(Path file) throws IOException {
+    FileStatus fileStatus = fs.getFileStatus(file);
+    return fileStatus.getModificationTime();
+  }
+  
 
   /** test delete cache */
   public void testDeleteCache() throws Exception {
@@ -448,7 +454,6 @@ public class TestTrackerDistributedCache
         new TrackerDistributedCacheManager(conf2, taskController);
     manager.startCleanupThread();
     FileSystem localfs = FileSystem.getLocal(conf2);
-    long now = System.currentTimeMillis();
     String userName = getJobOwnerName();
     conf2.set(MRJobConfig.USER_NAME, userName);
 
@@ -456,8 +461,9 @@ public class TestTrackerDistributedCache
     Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
-        now, new Path(TEST_ROOT_DIR), false, false);
-    manager.releaseCache(firstCacheFile.toUri(), conf2, now);
+        getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
+    manager.releaseCache(firstCacheFile.toUri(), conf2,
+        getFileStamp(firstCacheFile));
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
@@ -465,7 +471,7 @@ public class TestTrackerDistributedCache
     manager.getLocalCache(secondCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(secondCacheFile), false, 
-        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+        getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false);
     checkCacheDeletion(localfs, localCache, "DistributedCache failed " +
         "deleting old cache when the cache store is full.");
     // Now we test the number of sub directories limit
@@ -479,15 +485,16 @@ public class TestTrackerDistributedCache
     Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(thirdCacheFile), false,
-        now, new Path(TEST_ROOT_DIR), false, false);
+        getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false);
     // Release the third cache so that it can be deleted while sweeping
-    manager.releaseCache(thirdCacheFile.toUri(), conf2, now);
+    manager.releaseCache(thirdCacheFile.toUri(), conf2,
+        getFileStamp(thirdCacheFile));
     // Getting the fourth cache will make the number of sub directories becomes
     // 3 which is greater than 2. So the released cache will be deleted.
     manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
-        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+        getFileStamp(fourthCacheFile), new Path(TEST_ROOT_DIR), false, false);
     checkCacheDeletion(localfs, thirdLocalCache,
         "DistributedCache failed deleting old" +
         " cache when the cache exceeds the number of sub directories limit.");
@@ -530,7 +537,7 @@ public class TestTrackerDistributedCache
     Path result = manager.getLocalCache(fileToCache.toUri(), conf,
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
-        System.currentTimeMillis(),
+        getFileStamp(firstCacheFile),
         new Path(TEST_ROOT_DIR), false, false);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
@@ -654,6 +661,27 @@ public class TestTrackerDistributedCache
     // release
     handle.release();
     
+    // running a task of the same job on another TaskTracker which has never
+    // initialized the cache
+    TrackerDistributedCacheManager manager2 = 
+      new TrackerDistributedCacheManager(myConf, taskController);
+    TaskDistributedCacheManager handle2 =
+      manager2.newTaskDistributedCacheManager(subConf);
+    File workDir2 = new File(new Path(TEST_ROOT_DIR, "workdir2").toString());
+    th = null;
+    try {
+      handle2.setup(localDirAllocator, workDir2, TaskTracker
+          .getPrivateDistributedCacheDir(userName), 
+          TaskTracker.getPublicDistributedCacheDir());
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull("Throwable is null", th);
+    assertTrue("Exception message does not match",
+        th.getMessage().contains("has changed on HDFS since job started"));
+    // release
+    handle.release();
+    
     // submit another job
     Configuration subConf2 = new Configuration(myConf);
     subConf2.set(MRJobConfig.USER_NAME, userName);
@@ -696,13 +724,12 @@ public class TestTrackerDistributedCache
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf, taskController);
     FileSystem localfs = FileSystem.getLocal(conf);
-    long now = System.currentTimeMillis();
 
     Path[] localCache = new Path[2];
     localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
-        now, new Path(TEST_ROOT_DIR), false, false);
+        getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
     FsPermission myPermission = new FsPermission((short)0600);
     Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
     if (FileSystem.create(localfs, myFile, myPermission) == null) {
@@ -712,7 +739,8 @@ public class TestTrackerDistributedCache
       localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, 
           TaskTracker.getPrivateDistributedCacheDir(userName),
           fs.getFileStatus(secondCacheFile), false, 
-          System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+          getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false,
+          false);
       FileStatus stat = localfs.getFileStatus(myFile);
       assertTrue(stat.getPermission().equals(myPermission));
       // validate permissions of localized files.