You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2011/07/21 03:59:15 UTC

svn commit: r1149004 - in /hadoop/common/trunk/mapreduce: ./ src/java/org/apache/hadoop/mapreduce/filecache/ src/test/mapred/org/apache/hadoop/mapreduce/filecache/

Author: cdouglas
Date: Thu Jul 21 01:59:14 2011
New Revision: 1149004

URL: http://svn.apache.org/viewvc?rev=1149004&view=rev
Log:
MAPREDUCE-2409. DistributedCache maps files and archives to the same path,
despite semantic incompatibility. Contributed by Siddharth Seth

Modified:
    hadoop/common/trunk/mapreduce/CHANGES.txt
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1149004&r1=1149003&r2=1149004&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Thu Jul 21 01:59:14 2011
@@ -344,6 +344,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2710. Update JobSubmitter.printTokens(..) for HDFS-2161.
     (szetszwo)
 
+    MAPREDUCE-2409. DistributedCache maps files and archives to the same path,
+    despite semantic incompatibility. (Siddharth Seth via cdouglas)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=1149004&r1=1149003&r2=1149004&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Thu Jul 21 01:59:14 2011
@@ -241,7 +241,7 @@ public class TaskDistributedCacheManager
     for (CacheFile c : cacheFiles) {
       if (c.getLocalized()) {
         distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp,
-            c.owner);
+            c.owner, CacheFile.FileType.ARCHIVE == c.type);
       }
     }
   }

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=1149004&r1=1149003&r2=1149004&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Jul 21 01:59:14 2011
@@ -158,9 +158,9 @@ public class TrackerDistributedCacheMana
       Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
       throws IOException {
     String key;
-    key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic));
+    key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic),
+        isArchive);
     CacheStatus lcacheStatus;
-    Path localizedPath = null;
     synchronized (cachedArchives) {
       lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -187,18 +187,18 @@ public class TrackerDistributedCacheMana
           FileSystem fs = FileSystem.get(cache, conf);
           checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
               lcacheStatus, fileStatus);
-          localizedPath = localizeCache(conf, cache, confFileStamp,
+          localizeCache(conf, cache, confFileStamp,
               lcacheStatus, isArchive, isPublic);
           lcacheStatus.initComplete();
         } else {
-          localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+          checkCacheStatusValidity(conf, cache, confFileStamp,
               lcacheStatus, fileStatus, isArchive);
         }
         createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
             honorSymLinkConf);
       }
       initSuccessful = true;
-      return localizedPath;
+      return lcacheStatus.localizedLoadPath;
     } finally {
       if (!initSuccessful) {
         lcacheStatus.decRefCount();
@@ -217,8 +217,8 @@ public class TrackerDistributedCacheMana
    * @throws IOException
    */
   void releaseCache(URI cache, Configuration conf, long timeStamp,
-      String owner) throws IOException {
-    String key = getKey(cache, conf, timeStamp, owner);
+      String owner, boolean isArchive) throws IOException {
+    String key = getKey(cache, conf, timeStamp, owner, isArchive);
     synchronized (cachedArchives) {
       CacheStatus lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -236,8 +236,8 @@ public class TrackerDistributedCacheMana
    * This method is called from unit tests. 
    */
   int getReferenceCount(URI cache, Configuration conf, long timeStamp,
-      String owner) throws IOException {
-    String key = getKey(cache, conf, timeStamp, owner);
+      String owner, boolean isArchive) throws IOException {
+    String key = getKey(cache, conf, timeStamp, owner, isArchive);
     synchronized (cachedArchives) {
       CacheStatus lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -315,9 +315,10 @@ public class TrackerDistributedCacheMana
     return path;
   }
 
-  String getKey(URI cache, Configuration conf, long timeStamp, String user)
-      throws IOException {
-    return makeRelative(cache, conf) + String.valueOf(timeStamp) + user;
+  String getKey(URI cache, Configuration conf, long timeStamp, String user,
+      boolean isArchive) throws IOException {
+    return (isArchive ? "a" : "f") + "^" + makeRelative(cache, conf) 
+      + String.valueOf(timeStamp) + user;
   }
   
   /**
@@ -341,12 +342,11 @@ public class TrackerDistributedCacheMana
    * @return mtime of a given cache file on hdfs
    * @throws IOException
    */
-  static long getTimestamp(Configuration conf, URI cache)
-    throws IOException {
+  long getTimestamp(Configuration conf, URI cache) throws IOException {
     return getFileStatus(conf, cache).getModificationTime();
   }
 
-  private Path checkCacheStatusValidity(Configuration conf,
+  void checkCacheStatusValidity(Configuration conf,
       URI cache, long confFileStamp,
       CacheStatus cacheStatus,
       FileStatus fileStatus,
@@ -362,7 +362,6 @@ public class TrackerDistributedCacheMana
 
     LOG.info(String.format("Using existing cache of %s->%s",
         cache.toString(), cacheStatus.localizedLoadPath));
-    return cacheStatus.localizedLoadPath;
   }
   
   private void createSymlink(Configuration conf, URI cache,
@@ -472,7 +471,7 @@ public class TrackerDistributedCacheMana
   }
   
   // ensure that the file on hdfs hasn't been modified since the job started
-  private long checkStampSinceJobStarted(Configuration conf, FileSystem fs,
+  long checkStampSinceJobStarted(Configuration conf, FileSystem fs,
                                           URI cache, long confFileStamp,
                                           CacheStatus lcacheStatus,
                                           FileStatus fileStatus)

Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=1149004&r1=1149003&r2=1149004&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Jul 21 01:59:14 2011
@@ -51,11 +51,15 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.mortbay.log.Log;
 
+import org.mockito.Matchers;
+import static org.mockito.Mockito.*;
+
 public class TestTrackerDistributedCacheManager extends TestCase {
 
   protected String TEST_ROOT_DIR =
@@ -251,7 +255,7 @@ public class TestTrackerDistributedCache
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
-          c.owner));
+          c.owner, false));
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -289,7 +293,7 @@ public class TestTrackerDistributedCache
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       try {
         assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp,
-            c.owner));
+            c.owner, false));
       } catch (IOException ie) {
         th = ie;
         Log.info("Exception getting reference count for " + c.uri, ie);
@@ -609,15 +613,15 @@ public class TestTrackerDistributedCache
 
     manager.releaseCache(thirdCacheFile.toUri(), conf2,
         getFileStamp(thirdCacheFile),
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
 
     manager.releaseCache(secondCacheFile.toUri(), conf2,
         getFileStamp(secondCacheFile),
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
 
     manager.releaseCache(firstCacheFile.toUri(), conf2,
         getFileStamp(firstCacheFile),
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
 
 
     // Getting the fourth cache will make the number of sub directories becomes
@@ -645,6 +649,47 @@ public class TestTrackerDistributedCache
     manager.stopCleanupThread();
   }
 
+  public void testSameNameFileArchiveCache() throws IOException,
+      URISyntaxException, InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+    TrackerDistributedCacheManager manager =
+        spy(new TrackerDistributedCacheManager(conf, taskController));
+    URI rsrc = new URI("file://foo/bar/yak");
+    Path cacheDir = new Path("file:///localcache");
+    Path archivePath = new Path(cacheDir, "archive");
+    Path filePath = new Path(cacheDir, "file");
+    doReturn(archivePath).when(manager).localizeCache(eq(conf), eq(rsrc),
+        anyLong(), Matchers.<CacheStatus> anyObject(), eq(true), anyBoolean());
+    doReturn(filePath).when(manager).localizeCache(eq(conf), eq(rsrc),
+        anyLong(), Matchers.<CacheStatus> anyObject(), eq(false), anyBoolean());
+    // could fail, but check match instead
+    doNothing().when(manager).checkCacheStatusValidity(
+        Matchers.<Configuration> anyObject(), eq(rsrc), anyLong(),
+        Matchers.<CacheStatus> anyObject(), Matchers.<FileStatus> anyObject(),
+        anyBoolean());
+    // localizeCache initializes mtime of cached rsrc; set to uninitialized val
+    doReturn(-1L).when(manager).checkStampSinceJobStarted(
+        Matchers.<Configuration> anyObject(),
+        Matchers.<FileSystem> anyObject(), eq(rsrc), anyLong(),
+        Matchers.<CacheStatus> anyObject(), Matchers.<FileStatus> anyObject());
+    doReturn(-1L).when(manager).getTimestamp(
+        Matchers.<Configuration> anyObject(), eq(rsrc));
+    FileStatus rsrcStatus = mock(FileStatus.class);
+    when(rsrcStatus.getLen()).thenReturn(4344L);
+
+    Path localizedPathForFile =
+        manager.getLocalCache(rsrc, conf, "sub", rsrcStatus, false, 20L,
+            new Path("file:///tmp"), false, true);
+    Path localizedPathForArchive =
+        manager.getLocalCache(rsrc, conf, "sub", rsrcStatus, true, 20L,
+            new Path("file:///tmp"), false, true);
+    assertNotSame("File and Archive resolve to the same path: "
+        + localizedPathForFile + ". Should differ.", localizedPathForFile,
+        localizedPathForArchive);
+  }
+  
   /** test delete cache */
   public void testDeleteCache() throws Exception {
     if (!canRun()) {
@@ -676,7 +721,7 @@ public class TestTrackerDistributedCache
         getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
     manager.releaseCache(firstCacheFile.toUri(), conf2,
         getFileStamp(firstCacheFile), 
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
@@ -702,7 +747,7 @@ public class TestTrackerDistributedCache
     // Release the third cache so that it can be deleted while sweeping
     manager.releaseCache(thirdCacheFile.toUri(), conf2,
         getFileStamp(thirdCacheFile), 
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
     // Getting the fourth cache will make the number of sub directories becomes
     // 3 which is greater than 2. So the released cache will be deleted.
     manager.getLocalCache(fourthCacheFile.toUri(), conf2,