You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/06/17 12:41:08 UTC
svn commit: r955543 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Author: amareshwari
Date: Thu Jun 17 10:41:08 2010
New Revision: 955543
URL: http://svn.apache.org/viewvc?rev=955543&view=rev
Log:
MAPREDUCE-1225. Fixes DistributedCache to check if the file is fresh or not, for the first localization also. Contributed by Zhong Wang.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=955543&r1=955542&r2=955543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jun 17 10:41:08 2010
@@ -92,6 +92,9 @@ Trunk (unreleased changes)
MAPREDUCE-1813. NPE in PipeMapred.MRErrorThread. (Ravi Gummadi via vinodkv)
+ MAPREDUCE-1225. Fixes DistributedCache to check if the file is fresh or not,
+ for the first localization also. (Zhong Wang via amareshwari)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=955543&r1=955542&r2=955543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Jun 17 10:41:08 2010
@@ -174,8 +174,11 @@ public class TrackerDistributedCacheMana
// do the localization, after releasing the global lock
synchronized (lcacheStatus) {
if (!lcacheStatus.isInited()) {
+ FileSystem fs = FileSystem.get(cache, conf);
+ checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
+ lcacheStatus, fileStatus);
localizedPath = localizeCache(conf, cache, confFileStamp,
- lcacheStatus, fileStatus, isArchive, isPublic);
+ lcacheStatus, isArchive, isPublic);
lcacheStatus.initComplete();
} else {
localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
@@ -404,7 +407,6 @@ public class TrackerDistributedCacheMana
Path localizeCache(Configuration conf,
URI cache, long confFileStamp,
CacheStatus cacheStatus,
- FileStatus fileStatus,
boolean isArchive, boolean isPublic)
throws IOException {
FileSystem fs = FileSystem.get(cache, conf);
@@ -488,9 +490,9 @@ public class TrackerDistributedCacheMana
return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
filename.endsWith(".tar"));
}
-
- // Checks if the cache has already been localized and is fresh
- private boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
+
+ // ensure that the file on hdfs hasn't been modified since the job started
+ private long checkStampSinceJobStarted(Configuration conf, FileSystem fs,
URI cache, long confFileStamp,
CacheStatus lcacheStatus,
FileStatus fileStatus)
@@ -502,13 +504,23 @@ public class TrackerDistributedCacheMana
dfsFileStamp = getTimestamp(conf, cache);
}
- // ensure that the file on hdfs hasn't been modified since the job started
if (dfsFileStamp != confFileStamp) {
LOG.fatal("File: " + cache + " has changed on HDFS since job started");
throw new IOException("File: " + cache +
" has changed on HDFS since job started");
}
+
+ return dfsFileStamp;
+ }
+ // Checks if the cache has already been localized and is fresh
+ private boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
+ URI cache, long confFileStamp,
+ CacheStatus lcacheStatus,
+ FileStatus fileStatus)
+ throws IOException {
+ long dfsFileStamp = checkStampSinceJobStarted(conf, fs, cache,
+ confFileStamp, lcacheStatus, fileStatus);
if (dfsFileStamp != lcacheStatus.mtime) {
return false;
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=955543&r1=955542&r2=955543&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Jun 17 10:41:08 2010
@@ -202,13 +202,13 @@ public class TestTrackerDistributedCache
@Override
Path localizeCache(Configuration conf, URI cache, long confFileStamp,
- CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive,
- boolean isPublic) throws IOException {
+ CacheStatus cacheStatus, boolean isArchive, boolean isPublic)
+ throws IOException {
if (cache.equals(firstCacheFile.toUri())) {
throw new IOException("fake fail");
}
return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
- fileStatus, isArchive, isPublic);
+ isArchive, isPublic);
}
}
@@ -426,6 +426,12 @@ public class TestTrackerDistributedCache
protected String getJobOwnerName() throws IOException {
return UserGroupInformation.getLoginUser().getUserName();
}
+
+ private long getFileStamp(Path file) throws IOException {
+ FileStatus fileStatus = fs.getFileStatus(file);
+ return fileStatus.getModificationTime();
+ }
+
/** test delete cache */
public void testDeleteCache() throws Exception {
@@ -448,7 +454,6 @@ public class TestTrackerDistributedCache
new TrackerDistributedCacheManager(conf2, taskController);
manager.startCleanupThread();
FileSystem localfs = FileSystem.getLocal(conf2);
- long now = System.currentTimeMillis();
String userName = getJobOwnerName();
conf2.set(MRJobConfig.USER_NAME, userName);
@@ -456,8 +461,9 @@ public class TestTrackerDistributedCache
Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
- now, new Path(TEST_ROOT_DIR), false, false);
- manager.releaseCache(firstCacheFile.toUri(), conf2, now);
+ getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
+ manager.releaseCache(firstCacheFile.toUri(), conf2,
+ getFileStamp(firstCacheFile));
//in above code,localized a file of size 4K and then release the cache
// which will cause the cache be deleted when the limit goes out.
// The below code localize another cache which's designed to
@@ -465,7 +471,7 @@ public class TestTrackerDistributedCache
manager.getLocalCache(secondCacheFile.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(secondCacheFile), false,
- System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false);
checkCacheDeletion(localfs, localCache, "DistributedCache failed " +
"deleting old cache when the cache store is full.");
// Now we test the number of sub directories limit
@@ -479,15 +485,16 @@ public class TestTrackerDistributedCache
Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(thirdCacheFile), false,
- now, new Path(TEST_ROOT_DIR), false, false);
+ getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false);
// Release the third cache so that it can be deleted while sweeping
- manager.releaseCache(thirdCacheFile.toUri(), conf2, now);
+ manager.releaseCache(thirdCacheFile.toUri(), conf2,
+ getFileStamp(thirdCacheFile));
// Getting the fourth cache will make the number of sub directories becomes
// 3 which is greater than 2. So the released cache will be deleted.
manager.getLocalCache(fourthCacheFile.toUri(), conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(fourthCacheFile), false,
- System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ getFileStamp(fourthCacheFile), new Path(TEST_ROOT_DIR), false, false);
checkCacheDeletion(localfs, thirdLocalCache,
"DistributedCache failed deleting old" +
" cache when the cache exceeds the number of sub directories limit.");
@@ -530,7 +537,7 @@ public class TestTrackerDistributedCache
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
- System.currentTimeMillis(),
+ getFileStamp(firstCacheFile),
new Path(TEST_ROOT_DIR), false, false);
assertNotNull("DistributedCache cached file on non-default filesystem.",
result);
@@ -654,6 +661,27 @@ public class TestTrackerDistributedCache
// release
handle.release();
+ // running a task of the same job on another TaskTracker which has never
+ // initialized the cache
+ TrackerDistributedCacheManager manager2 =
+ new TrackerDistributedCacheManager(myConf, taskController);
+ TaskDistributedCacheManager handle2 =
+ manager2.newTaskDistributedCacheManager(subConf);
+ File workDir2 = new File(new Path(TEST_ROOT_DIR, "workdir2").toString());
+ th = null;
+ try {
+ handle2.setup(localDirAllocator, workDir2, TaskTracker
+ .getPrivateDistributedCacheDir(userName),
+ TaskTracker.getPublicDistributedCacheDir());
+ } catch (IOException ie) {
+ th = ie;
+ }
+ assertNotNull("Throwable is null", th);
+ assertTrue("Exception message does not match",
+ th.getMessage().contains("has changed on HDFS since job started"));
+ // release
+ handle.release();
+
// submit another job
Configuration subConf2 = new Configuration(myConf);
subConf2.set(MRJobConfig.USER_NAME, userName);
@@ -696,13 +724,12 @@ public class TestTrackerDistributedCache
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
FileSystem localfs = FileSystem.getLocal(conf);
- long now = System.currentTimeMillis();
Path[] localCache = new Path[2];
localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(firstCacheFile), false,
- now, new Path(TEST_ROOT_DIR), false, false);
+ getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
FsPermission myPermission = new FsPermission((short)0600);
Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
if (FileSystem.create(localfs, myFile, myPermission) == null) {
@@ -712,7 +739,8 @@ public class TestTrackerDistributedCache
localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(secondCacheFile), false,
- System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+ getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false,
+ false);
FileStatus stat = localfs.getFileStatus(myFile);
assertTrue(stat.getPermission().equals(myPermission));
// validate permissions of localized files.