You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:53:05 UTC

svn commit: r1077220 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/filecache/ test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 03:53:05 2011
New Revision: 1077220

URL: http://svn.apache.org/viewvc?rev=1077220&view=rev
Log:
commit d6e3105a5bc1581545579c55ca5d53c363ea3e46
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date:   Wed Feb 24 17:04:46 2010 +0530

    MAPREDUCE:1403 from https://issues.apache.org/jira/secure/attachment/12436842/MAPREDUCE-1403_yhadoop20-2.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1403. Save file-sizes of each of the artifacts in
    +    DistributedCache in the JobConf (Arun Murthy via yhemanth)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java?rev=1077220&r1=1077219&r2=1077220&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java Fri Mar  4 03:53:05 2011
@@ -129,6 +129,17 @@ import java.net.URI;
  */
 public class DistributedCache {
   /**
+   * Warning: {@link #CACHE_FILES_SIZES} is not a *public* constant.
+   */
+  public static final String CACHE_FILES_SIZES = "mapred.cache.files.filesizes";
+  
+  /**
+   * Warning: {@link #CACHE_ARCHIVES_SIZES} is not a *public* constant.
+   */
+  public static final String CACHE_ARCHIVES_SIZES = 
+    "mapred.cache.archives.filesizes";
+  
+  /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
@@ -300,18 +311,30 @@ public class DistributedCache {
   }
 
   /**
-   * Returns mtime of a given cache file on hdfs.
+   * Returns {@link FileStatus} of a given cache file on hdfs.
    * @param conf configuration
    * @param cache cache file 
-   * @return mtime of a given cache file on hdfs
+   * @return <code>FileStatus</code> of a given cache file on hdfs
    * @throws IOException
    */
-  public static long getTimestamp(Configuration conf, URI cache)
+  public static FileStatus getFileStatus(Configuration conf, URI cache)
     throws IOException {
     FileSystem fileSystem = FileSystem.get(cache, conf);
     Path filePath = new Path(cache.getPath());
 
-    return fileSystem.getFileStatus(filePath).getModificationTime();
+    return fileSystem.getFileStatus(filePath);
+  }
+  
+  /**
+   * Returns mtime of a given cache file on hdfs.
+   * @param conf configuration
+   * @param cache cache file 
+   * @return mtime of a given cache file on hdfs
+   * @throws IOException
+   */
+  public static long getTimestamp(Configuration conf, URI cache)
+    throws IOException {
+    return getFileStatus(conf, cache).getModificationTime();
   }
   
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077220&r1=1077219&r2=1077220&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar  4 03:53:05 2011
@@ -628,26 +628,39 @@ public class TrackerDistributedCacheMana
   public static void determineTimestamps(Configuration job) throws IOException {
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     if (tarchives != null) {
+      FileStatus status = DistributedCache.getFileStatus(job, tarchives[0]);
+      StringBuffer archiveFileSizes = 
+        new StringBuffer(String.valueOf(status.getLen()));      
       StringBuffer archiveTimestamps = 
-        new StringBuffer(String.valueOf(
-            DistributedCache.getTimestamp(job, tarchives[0])));
+        new StringBuffer(String.valueOf(status.getModificationTime()));
       for (int i = 1; i < tarchives.length; i++) {
+        status = DistributedCache.getFileStatus(job, tarchives[i]);
+        archiveFileSizes.append(",");
+        archiveFileSizes.append(String.valueOf(status.getLen()));
         archiveTimestamps.append(",");
         archiveTimestamps.append(String.valueOf(
-            DistributedCache.getTimestamp(job, tarchives[i])));
+            status.getModificationTime()));
       }
+      job.set(DistributedCache.CACHE_ARCHIVES_SIZES, 
+          archiveFileSizes.toString());
       DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
     }
   
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     if (tfiles != null) {
+      FileStatus status = DistributedCache.getFileStatus(job, tfiles[0]);
+      StringBuffer fileSizes = 
+        new StringBuffer(String.valueOf(status.getLen()));      
       StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
-          DistributedCache.getTimestamp(job, tfiles[0])));
+          status.getModificationTime()));
       for (int i = 1; i < tfiles.length; i++) {
+        status = DistributedCache.getFileStatus(job, tfiles[i]);
+        fileSizes.append(",");
+        fileSizes.append(String.valueOf(status.getLen()));
         fileTimestamps.append(",");
-        fileTimestamps.append(String.valueOf(
-            DistributedCache.getTimestamp(job, tfiles[i])));
+        fileTimestamps.append(String.valueOf(status.getModificationTime()));
       }
+      job.set(DistributedCache.CACHE_FILES_SIZES, fileSizes.toString());
       DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
     }
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java?rev=1077220&r1=1077219&r2=1077220&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java Fri Mar  4 03:53:05 2011
@@ -34,8 +34,11 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.filecache.*;
+
 import java.net.URI;
 
+import junit.framework.Assert;
+
 public class MRCaching {
   static String testStr = "This is a test file " + "used for testing caching "
     + "jars, zip and normal files.";
@@ -269,11 +272,22 @@ public class MRCaching {
       uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
       uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
     }
+    
+    //Add files to DC and track their sizes
     DistributedCache.addCacheFile(uris[0], conf);
+    long[] fileSizes = new long[1];
+    fileSizes[0] = fs.getFileStatus(new Path(uris[0].getPath())).getLen();
+    
+    long archivesSizes[] = new long[5];
     for (int i = 1; i < 6; i++) {
       DistributedCache.addCacheArchive(uris[i], conf);
+      archivesSizes[i-1] = 
+        fs.getFileStatus(new Path(uris[i].getPath())).getLen();
     }
+    
+    // Run the job
     RunningJob job = JobClient.runJob(conf);
+    
     int count = 0;
     // after the job ran check to see if the input from the localized cache
     // match the real string. check if there are 3 instances or not.
@@ -294,7 +308,32 @@ public class MRCaching {
     if (count != 6)
       return new TestResult(job, false);
 
+    // Check to ensure the filesizes of files in DC were correctly saved
+    validateCacheFilesSizes(conf, fileSizes, DistributedCache.CACHE_FILES_SIZES);
+    validateCacheFilesSizes(conf, archivesSizes, 
+                            DistributedCache.CACHE_ARCHIVES_SIZES);
+    
     return new TestResult(job, true);
 
   }
+  
+  private static void validateCacheFilesSizes(JobConf job, 
+                                       long[] expectedSizes, String configKey) 
+  throws IOException {
+    String configValues = job.get(configKey, "");
+    System.out.println(configKey + " -> " + configValues);
+    String[] realSizes = StringUtils.getStrings(configValues);
+    Assert.assertEquals("Found " + realSizes.length + " file-sizes for " + 
+                            configKey + " (" + configValues + "), expected: " + 
+                            expectedSizes.length, 
+                        expectedSizes.length, realSizes.length);
+    
+    for (int i=0; i < expectedSizes.length; ++i) {
+      long actual = Long.valueOf(realSizes[i]);
+      long expected = expectedSizes[i];
+      Assert.assertEquals("Found length: " + actual + ", while expected: " + 
+                              expected, 
+                          expected, actual);
+    }
+  }
 }