You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:53:05 UTC
svn commit: r1077220 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/filecache/ test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 03:53:05 2011
New Revision: 1077220
URL: http://svn.apache.org/viewvc?rev=1077220&view=rev
Log:
commit d6e3105a5bc1581545579c55ca5d53c363ea3e46
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date: Wed Feb 24 17:04:46 2010 +0530
MAPREDUCE:1403 from https://issues.apache.org/jira/secure/attachment/12436842/MAPREDUCE-1403_yhadoop20-2.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1403. Save file-sizes of each of the artifacts in
+ DistributedCache in the JobConf (Arun Murthy via yhemanth)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java?rev=1077220&r1=1077219&r2=1077220&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java Fri Mar 4 03:53:05 2011
@@ -129,6 +129,17 @@ import java.net.URI;
*/
public class DistributedCache {
/**
+ * Warning: {@link #CACHE_FILES_SIZES} is not a *public* constant.
+ */
+ public static final String CACHE_FILES_SIZES = "mapred.cache.files.filesizes";
+
+ /**
+ * Warning: {@link #CACHE_ARCHIVES_SIZES} is not a *public* constant.
+ */
+ public static final String CACHE_ARCHIVES_SIZES =
+ "mapred.cache.archives.filesizes";
+
+ /**
* Get the locally cached file or archive; it could either be
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
@@ -300,18 +311,30 @@ public class DistributedCache {
}
/**
- * Returns mtime of a given cache file on hdfs.
+ * Returns {@link FileStatus} of a given cache file on hdfs.
* @param conf configuration
* @param cache cache file
- * @return mtime of a given cache file on hdfs
+ * @return <code>FileStatus</code> of a given cache file on hdfs
* @throws IOException
*/
- public static long getTimestamp(Configuration conf, URI cache)
+ public static FileStatus getFileStatus(Configuration conf, URI cache)
throws IOException {
FileSystem fileSystem = FileSystem.get(cache, conf);
Path filePath = new Path(cache.getPath());
- return fileSystem.getFileStatus(filePath).getModificationTime();
+ return fileSystem.getFileStatus(filePath);
+ }
+
+ /**
+ * Returns mtime of a given cache file on hdfs.
+ * @param conf configuration
+ * @param cache cache file
+ * @return mtime of a given cache file on hdfs
+ * @throws IOException
+ */
+ public static long getTimestamp(Configuration conf, URI cache)
+ throws IOException {
+ return getFileStatus(conf, cache).getModificationTime();
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077220&r1=1077219&r2=1077220&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar 4 03:53:05 2011
@@ -628,26 +628,39 @@ public class TrackerDistributedCacheMana
public static void determineTimestamps(Configuration job) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) {
+ FileStatus status = DistributedCache.getFileStatus(job, tarchives[0]);
+ StringBuffer archiveFileSizes =
+ new StringBuffer(String.valueOf(status.getLen()));
StringBuffer archiveTimestamps =
- new StringBuffer(String.valueOf(
- DistributedCache.getTimestamp(job, tarchives[0])));
+ new StringBuffer(String.valueOf(status.getModificationTime()));
for (int i = 1; i < tarchives.length; i++) {
+ status = DistributedCache.getFileStatus(job, tarchives[i]);
+ archiveFileSizes.append(",");
+ archiveFileSizes.append(String.valueOf(status.getLen()));
archiveTimestamps.append(",");
archiveTimestamps.append(String.valueOf(
- DistributedCache.getTimestamp(job, tarchives[i])));
+ status.getModificationTime()));
}
+ job.set(DistributedCache.CACHE_ARCHIVES_SIZES,
+ archiveFileSizes.toString());
DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
}
URI[] tfiles = DistributedCache.getCacheFiles(job);
if (tfiles != null) {
+ FileStatus status = DistributedCache.getFileStatus(job, tfiles[0]);
+ StringBuffer fileSizes =
+ new StringBuffer(String.valueOf(status.getLen()));
StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
- DistributedCache.getTimestamp(job, tfiles[0])));
+ status.getModificationTime()));
for (int i = 1; i < tfiles.length; i++) {
+ status = DistributedCache.getFileStatus(job, tfiles[i]);
+ fileSizes.append(",");
+ fileSizes.append(String.valueOf(status.getLen()));
fileTimestamps.append(",");
- fileTimestamps.append(String.valueOf(
- DistributedCache.getTimestamp(job, tfiles[i])));
+ fileTimestamps.append(String.valueOf(status.getModificationTime()));
}
+ job.set(DistributedCache.CACHE_FILES_SIZES, fileSizes.toString());
DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java?rev=1077220&r1=1077219&r2=1077220&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MRCaching.java Fri Mar 4 03:53:05 2011
@@ -34,8 +34,11 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.filecache.*;
+
import java.net.URI;
+import junit.framework.Assert;
+
public class MRCaching {
static String testStr = "This is a test file " + "used for testing caching "
+ "jars, zip and normal files.";
@@ -269,11 +272,22 @@ public class MRCaching {
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
}
+
+ //Add files to DC and track their sizes
DistributedCache.addCacheFile(uris[0], conf);
+ long[] fileSizes = new long[1];
+ fileSizes[0] = fs.getFileStatus(new Path(uris[0].getPath())).getLen();
+
+ long archivesSizes[] = new long[5];
for (int i = 1; i < 6; i++) {
DistributedCache.addCacheArchive(uris[i], conf);
+ archivesSizes[i-1] =
+ fs.getFileStatus(new Path(uris[i].getPath())).getLen();
}
+
+ // Run the job
RunningJob job = JobClient.runJob(conf);
+
int count = 0;
// after the job ran check to see if the input from the localized cache
// match the real string. check if there are 3 instances or not.
@@ -294,7 +308,32 @@ public class MRCaching {
if (count != 6)
return new TestResult(job, false);
+ // Check to ensure the filesizes of files in DC were correctly saved
+ validateCacheFilesSizes(conf, fileSizes, DistributedCache.CACHE_FILES_SIZES);
+ validateCacheFilesSizes(conf, archivesSizes,
+ DistributedCache.CACHE_ARCHIVES_SIZES);
+
return new TestResult(job, true);
}
+
+ private static void validateCacheFilesSizes(JobConf job,
+ long[] expectedSizes, String configKey)
+ throws IOException {
+ String configValues = job.get(configKey, "");
+ System.out.println(configKey + " -> " + configValues);
+ String[] realSizes = StringUtils.getStrings(configValues);
+ Assert.assertEquals("Found " + realSizes.length + " file-sizes for " +
+ configKey + " (" + configValues + "), expected: " +
+ expectedSizes.length,
+ expectedSizes.length, realSizes.length);
+
+ for (int i=0; i < expectedSizes.length; ++i) {
+ long actual = Long.valueOf(realSizes[i]);
+ long expected = expectedSizes[i];
+ Assert.assertEquals("Found length: " + actual + ", while expected: " +
+ expected,
+ expected, actual);
+ }
+ }
}