You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2009/10/25 01:37:28 UTC
svn commit: r829468 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapreduce/filecache/
src/test/mapred/org/apache/hadoop/mapred/
src/test/mapred/org/apache/hadoop/mapreduce/filecache/
Author: acmurthy
Date: Sat Oct 24 23:37:28 2009
New Revision: 829468
URL: http://svn.apache.org/viewvc?rev=829468&view=rev
Log:
MAPREDUCE-1098. Fixed the distributed-cache to not do i/o while holding a global lock. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Oct 24 23:37:28 2009
@@ -43,6 +43,9 @@
MAPREDUCE-1133. Eclipse .classpath template has outdated jar files and is
missing some new ones. (cos)
+ MAPREDUCE-1098. Fixed the distributed-cache to not do i/o while holding a
+ global lock. (Amareshwari Sriramadasu via acmurthy)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Sat Oct 24 23:37:28 2009
@@ -199,7 +199,7 @@
Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
- baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+ baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
honorSymLinkConf);
}
@@ -250,7 +250,35 @@
@Deprecated
public static void releaseCache(URI cache, Configuration conf)
throws IOException {
- new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
+ // find the timestamp of the uri
+ URI[] archives = DistributedCache.getCacheArchives(conf);
+ URI[] files = DistributedCache.getCacheFiles(conf);
+ String[] archivesTimestamps =
+ DistributedCache.getArchiveTimestamps(conf);
+ String[] filesTimestamps =
+ DistributedCache.getFileTimestamps(conf);
+ String timestamp = null;
+ if (archives != null) {
+ for (int i = 0; i < archives.length; i++) {
+ if (archives[i].equals(cache)) {
+ timestamp = archivesTimestamps[i];
+ break;
+ }
+ }
+ }
+ if (timestamp == null && files != null) {
+ for (int i = 0; i < files.length; i++) {
+ if (files[i].equals(cache)) {
+ timestamp = filesTimestamps[i];
+ break;
+ }
+ }
+ }
+ if (timestamp == null) {
+ throw new IOException("TimeStamp of the uri couldnot be found");
+ }
+ new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
+ Long.parseLong(timestamp));
}
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Sat Oct 24 23:37:28 2009
@@ -151,23 +151,9 @@
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
- String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
- String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
- // Get the local path if the cacheFile is already localized or create one
- // if it doesn't
- Path localPath;
- try {
- localPath = lDirAlloc.getLocalPathToRead(cachePath, taskConf);
- } catch (DiskErrorException de) {
- localPath =
- lDirAlloc.getLocalPathForWrite(cachePath, fileStatus.getLen(),
- taskConf);
- }
-
- String baseDir = localPath.toString().replace(cacheId, "");
Path p = distributedCacheManager.getLocalCache(uri, taskConf,
- new Path(baseDir), fileStatus,
+ cacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
cacheFile.timestamp, workdirPath, false);
@@ -224,7 +210,7 @@
*/
public void release() throws IOException {
for (CacheFile c : cacheFiles) {
- distributedCacheManager.releaseCache(c.uri, taskConf);
+ distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Sat Oct 24 23:37:28 2009
@@ -20,8 +20,11 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -32,6 +35,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.RunJar;
@@ -59,9 +63,17 @@
LogFactory.getLog(TrackerDistributedCacheManager.class);
private final LocalFileSystem localFs;
+
+ private LocalDirAllocator lDirAllocator;
+
+ private Configuration trackerConf;
+
+ private Random random = new Random();
public TrackerDistributedCacheManager(Configuration conf) throws IOException {
this.localFs = FileSystem.getLocal(conf);
+ this.trackerConf = conf;
+ this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
}
/**
@@ -71,7 +83,7 @@
* @param cache the cache to be localized, this should be specified as
* new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Configuration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the
+ * @param subDir The base cache subDir where you want to localize the
* files/archives
* @param fileStatus The file status on the dfs.
* @param isArchive if the cache is an archive or a file. In case it is an
@@ -94,35 +106,55 @@
* @throws IOException
*/
Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, FileStatus fileStatus,
+ String subDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
Path currentWorkDir, boolean honorSymLinkConf)
throws IOException {
- String cacheId = makeRelative(cache, conf);
+ String key = getKey(cache, conf, confFileStamp);
CacheStatus lcacheStatus;
- Path localizedPath;
+ Path localizedPath = null;
synchronized (cachedArchives) {
- lcacheStatus = cachedArchives.get(cacheId);
+ lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
// was never localized
- lcacheStatus = new CacheStatus(baseDir,
- new Path(baseDir, new Path(cacheId)));
- cachedArchives.put(cacheId, lcacheStatus);
- }
-
- synchronized (lcacheStatus) {
+ String cachePath = new Path (subDir,
+ new Path(String.valueOf(random.nextLong()),
+ makeRelative(cache, conf))).toString();
+ Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), trackerConf);
+ lcacheStatus = new CacheStatus(
+ new Path(localPath.toString().replace(cachePath, "")), localPath);
+ cachedArchives.put(key, lcacheStatus);
+ }
+
+ //mark the cache for use.
+ lcacheStatus.refcount++;
+ }
+
+ // do the localization, after releasing the global lock
+ synchronized (lcacheStatus) {
+ if (!lcacheStatus.isInited()) {
localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
- fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
- lcacheStatus.refcount++;
+ fileStatus, isArchive);
+ lcacheStatus.initComplete();
+ } else {
+ localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+ lcacheStatus, fileStatus, isArchive);
}
+ createSymlink(conf, cache, lcacheStatus, isArchive,
+ currentWorkDir, honorSymLinkConf);
}
// try deleting stuff if you can
long size = 0;
- synchronized (baseDirSize) {
- Long get = baseDirSize.get(baseDir);
- if ( get != null ) {
- size = get.longValue();
+ synchronized (lcacheStatus) {
+ synchronized (baseDirSize) {
+ Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+ if ( get != null ) {
+ size = get.longValue();
+ } else {
+ LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
+ }
}
}
// setting the cache size to a default of 10GB
@@ -142,40 +174,58 @@
* is contained in.
* @throws IOException
*/
- void releaseCache(URI cache, Configuration conf)
+ void releaseCache(URI cache, Configuration conf, long timeStamp)
throws IOException {
- String cacheId = makeRelative(cache, conf);
+ String key = getKey(cache, conf, timeStamp);
synchronized (cachedArchives) {
- CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- if (lcacheStatus == null)
+ CacheStatus lcacheStatus = cachedArchives.get(key);
+ if (lcacheStatus == null) {
+ LOG.warn("Cannot find localized cache: " + cache +
+ " (key: " + key + ") in releaseCache!");
return;
- synchronized (lcacheStatus) {
- lcacheStatus.refcount--;
}
+
+ // decrement ref count
+ lcacheStatus.refcount--;
}
}
// To delete the caches which have a refcount of zero
private void deleteCache(Configuration conf) throws IOException {
+ Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
// try deleting cache Status with refcount of zero
synchronized (cachedArchives) {
for (Iterator<String> it = cachedArchives.keySet().iterator();
it.hasNext();) {
String cacheId = it.next();
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- synchronized (lcacheStatus) {
- if (lcacheStatus.refcount == 0) {
- // delete this cache entry
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
- if ( dirSize != null ) {
- dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
- }
- }
- it.remove();
+
+ // if reference count is zero
+ // mark the cache for deletion
+ if (lcacheStatus.refcount == 0) {
+ // delete this cache entry from the global list
+ // and mark the localized file for deletion
+ deleteSet.add(lcacheStatus);
+ it.remove();
+ }
+ }
+ }
+
+ // do the deletion, after releasing the global lock
+ for (CacheStatus lcacheStatus : deleteSet) {
+ synchronized (lcacheStatus) {
+ FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+ LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+ // decrement the size of the cache from baseDirSize
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+ if ( dirSize != null ) {
+ dirSize -= lcacheStatus.size;
+ baseDirSize.put(lcacheStatus.baseDir, dirSize);
+ } else {
+ LOG.warn("Cannot find record of the baseDir: " +
+ lcacheStatus.baseDir + " during delete!");
}
}
}
@@ -208,6 +258,11 @@
return path;
}
+ String getKey(URI cache, Configuration conf, long timeStamp)
+ throws IOException {
+ return makeRelative(cache, conf) + String.valueOf(timeStamp);
+ }
+
/**
* Returns mtime of a given cache file on hdfs.
*
@@ -224,144 +279,115 @@
return fileSystem.getFileStatus(filePath).getModificationTime();
}
- private Path cacheFilePath(Path p) {
- return new Path(p, p.getName());
- }
+ private Path checkCacheStatusValidity(Configuration conf,
+ URI cache, long confFileStamp,
+ CacheStatus cacheStatus,
+ FileStatus fileStatus,
+ boolean isArchive
+ ) throws IOException {
+ FileSystem fs = FileSystem.get(cache, conf);
+ // Has to be
+ if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
+ cacheStatus, fileStatus)) {
+ throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
+ " for cache-file: " + cache);
+ }
- // the method which actually copies the caches locally and unjars/unzips them
- // and does chmod for the files
- private Path localizeCache(Configuration conf,
- URI cache, long confFileStamp,
- CacheStatus cacheStatus,
- FileStatus fileStatus,
- boolean isArchive,
- Path currentWorkDir,
- boolean honorSymLinkConf)
- throws IOException {
+ LOG.info(String.format("Using existing cache of %s->%s",
+ cache.toString(), cacheStatus.localLoadPath));
+ return cacheStatus.localLoadPath;
+ }
+
+ private void createSymlink(Configuration conf, URI cache,
+ CacheStatus cacheStatus, boolean isArchive,
+ Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
if(cache.getFragment() == null) {
doSymlink = false;
}
- FileSystem fs = FileSystem.get(cache, conf);
String link =
currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
File flink = new File(link);
- if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
- cacheStatus, fileStatus)) {
- LOG.info(String.format("Using existing cache of %s->%s",
- cache.toString(), cacheStatus.localLoadPath));
- if (isArchive) {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
- link);
- }
-
- return cacheStatus.localLoadPath;
- }
- else {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(
- cacheFilePath(cacheStatus.localLoadPath).toString(), link);
- }
- return cacheFilePath(cacheStatus.localLoadPath);
+ if (doSymlink){
+ if (!flink.exists()) {
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
}
+ }
+ }
+
+ // the method which actually copies the caches locally and unjars/unzips them
+ // and does chmod for the files
+ private Path localizeCache(Configuration conf,
+ URI cache, long confFileStamp,
+ CacheStatus cacheStatus,
+ FileStatus fileStatus,
+ boolean isArchive)
+ throws IOException {
+ FileSystem fs = FileSystem.get(cache, conf);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path parchive = null;
+ if (isArchive) {
+ parchive = new Path(cacheStatus.localLoadPath,
+ new Path(cacheStatus.localLoadPath.getName()));
} else {
+ parchive = cacheStatus.localLoadPath;
+ }
- // remove the old archive
- // if the old archive cannot be removed since it is being used by another
- // job
- // return null
- if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
- throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
- + " is in use and cannot be refreshed");
-
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(cacheStatus.localLoadPath, true);
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
- if ( dirSize != null ) {
- dirSize -= cacheStatus.size;
- baseDirSize.put(cacheStatus.baseDir, dirSize);
- }
- }
- Path parchive = new Path(cacheStatus.localLoadPath,
- new Path(cacheStatus.localLoadPath.getName()));
-
- if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
- throw new IOException("Mkdirs failed to create directory " +
- cacheStatus.localLoadPath.toString());
- }
-
- String cacheId = cache.getPath();
- fs.copyToLocalFile(new Path(cacheId), parchive);
- if (isArchive) {
- String tmpArchive = parchive.toString().toLowerCase();
- File srcFile = new File(parchive.toString());
- File destDir = new File(parchive.getParent().toString());
- LOG.info(String.format("Extracting %s to %s",
- srcFile.toString(), destDir.toString()));
- if (tmpArchive.endsWith(".jar")) {
- RunJar.unJar(srcFile, destDir);
- } else if (tmpArchive.endsWith(".zip")) {
- FileUtil.unZip(srcFile, destDir);
- } else if (isTarFile(tmpArchive)) {
- FileUtil.unTar(srcFile, destDir);
- } else {
- LOG.warn(String.format(
+ if (!localFs.mkdirs(parchive.getParent())) {
+ throw new IOException("Mkdirs failed to create directory " +
+ cacheStatus.localLoadPath.toString());
+ }
+
+ String cacheId = cache.getPath();
+ fs.copyToLocalFile(new Path(cacheId), parchive);
+ if (isArchive) {
+ String tmpArchive = parchive.toString().toLowerCase();
+ File srcFile = new File(parchive.toString());
+ File destDir = new File(parchive.getParent().toString());
+ LOG.info(String.format("Extracting %s to %s",
+ srcFile.toString(), destDir.toString()));
+ if (tmpArchive.endsWith(".jar")) {
+ RunJar.unJar(srcFile, destDir);
+ } else if (tmpArchive.endsWith(".zip")) {
+ FileUtil.unZip(srcFile, destDir);
+ } else if (isTarFile(tmpArchive)) {
+ FileUtil.unTar(srcFile, destDir);
+ } else {
+ LOG.warn(String.format(
"Cache file %s specified as archive, but not valid extension.",
srcFile.toString()));
- // else will not do anyhting
- // and copy the file into the dir as it is
- }
+ // else will not do anyhting
+ // and copy the file into the dir as it is
}
+ }
- long cacheSize =
- FileUtil.getDU(new File(parchive.getParent().toString()));
- cacheStatus.size = cacheSize;
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
- if( dirSize == null ) {
- dirSize = Long.valueOf(cacheSize);
- } else {
- dirSize += cacheSize;
- }
- baseDirSize.put(cacheStatus.baseDir, dirSize);
+ long cacheSize =
+ FileUtil.getDU(new File(parchive.getParent().toString()));
+ cacheStatus.size = cacheSize;
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+ if( dirSize == null ) {
+ dirSize = Long.valueOf(cacheSize);
+ } else {
+ dirSize += cacheSize;
}
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
+ }
- // do chmod here
- try {
- //Setting recursive permission to grant everyone read and execute
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
- } catch(InterruptedException e) {
+ // do chmod here
+ try {
+ //Setting recursive permission to grant everyone read and execute
+ FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+ } catch(InterruptedException e) {
LOG.warn("Exception in chmod" + e.toString());
- }
-
- // update cacheStatus to reflect the newly cached file
- cacheStatus.currentStatus = true;
- cacheStatus.mtime = getTimestamp(conf, cache);
-
- LOG.info(String.format("Cached %s as %s",
- cache.toString(), cacheStatus.localLoadPath));
}
- if (isArchive){
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
- link);
- }
- return cacheStatus.localLoadPath;
- }
- else {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
- link);
- }
- return cacheFilePath(cacheStatus.localLoadPath);
- }
+ // update cacheStatus to reflect the newly cached file
+ cacheStatus.mtime = getTimestamp(conf, cache);
+
+ LOG.info(String.format("Cached %s as %s",
+ cache.toString(), cacheStatus.localLoadPath));
+ return cacheStatus.localLoadPath;
}
private static boolean isTarFile(String filename) {
@@ -375,28 +401,22 @@
CacheStatus lcacheStatus,
FileStatus fileStatus)
throws IOException {
- // check for existence of the cache
- if (lcacheStatus.currentStatus == false) {
- return false;
+ long dfsFileStamp;
+ if (fileStatus != null) {
+ dfsFileStamp = fileStatus.getModificationTime();
} else {
- long dfsFileStamp;
- if (fileStatus != null) {
- dfsFileStamp = fileStatus.getModificationTime();
- } else {
- dfsFileStamp = getTimestamp(conf, cache);
- }
+ dfsFileStamp = getTimestamp(conf, cache);
+ }
- // ensure that the file on hdfs hasn't been modified since the job started
- if (dfsFileStamp != confFileStamp) {
- LOG.fatal("File: " + cache + " has changed on HDFS since job started");
- throw new IOException("File: " + cache +
- " has changed on HDFS since job started");
- }
+ // ensure that the file on hdfs hasn't been modified since the job started
+ if (dfsFileStamp != confFileStamp) {
+ LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+ throw new IOException("File: " + cache +
+ " has changed on HDFS since job started");
+ }
- if (dfsFileStamp != lcacheStatus.mtime) {
- // needs refreshing
- return false;
- }
+ if (dfsFileStamp != lcacheStatus.mtime) {
+ return false;
}
return true;
@@ -437,9 +457,6 @@
}
private static class CacheStatus {
- // false, not loaded yet, true is loaded
- boolean currentStatus;
-
// the local load path of this cache
Path localLoadPath;
@@ -455,15 +472,31 @@
// the cache-file modification time
long mtime;
+ // is it initialized ?
+ boolean inited = false;
+
public CacheStatus(Path baseDir, Path localLoadPath) {
super();
- this.currentStatus = false;
this.localLoadPath = localLoadPath;
this.refcount = 0;
this.mtime = -1;
this.baseDir = baseDir;
this.size = 0;
}
+
+ Path getBaseDir(){
+ return this.baseDir;
+ }
+
+ // mark it as initialized
+ void initComplete() {
+ inited = true;
+ }
+
+ // is it initialized?
+ boolean isInited() {
+ return inited;
+ }
}
/**
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Sat Oct 24 23:37:28 2009
@@ -183,4 +183,12 @@
}
super.testFileSystemOtherThanDefault();
}
+
+ @Override
+ public void testFreshness() throws Exception {
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+ super.testFreshness();
+ }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=829468&r1=829467&r2=829468&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Sat Oct 24 23:37:28 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -53,7 +54,7 @@
.getAbsolutePath();
protected File ROOT_MAPRED_LOCAL_DIR;
- private static String TEST_CACHE_BASE_DIR;
+ private static String TEST_CACHE_BASE_DIR = "cachebasedir";
protected int numLocalDirs = 6;
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
@@ -61,6 +62,7 @@
protected Configuration conf;
protected Path firstCacheFile;
protected Path secondCacheFile;
+ private FileSystem fs;
protected LocalDirAllocator localDirAllocator =
new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
@@ -77,18 +79,12 @@
// Prepare the tests' mapred-local-dir
ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
ROOT_MAPRED_LOCAL_DIR.mkdirs();
- String []localDirs = new String[numLocalDirs];
- for (int i = 0; i < numLocalDirs; i++) {
- localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
- }
-
- TEST_CACHE_BASE_DIR =
- new File(TEST_ROOT_DIR, "cachebasedir").getAbsolutePath();
conf = new Configuration();
conf.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
- conf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localDirs);
+ conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, ROOT_MAPRED_LOCAL_DIR.toString());
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+ fs = FileSystem.get(conf);
// Create the temporary cache files to be used in the tests.
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
@@ -187,20 +183,21 @@
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf);
FileSystem localfs = FileSystem.getLocal(conf);
+ long now = System.currentTimeMillis();
manager.getLocalCache(firstCacheFile.toUri(), conf,
- new Path(TEST_CACHE_BASE_DIR), null, false,
- System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
- manager.releaseCache(firstCacheFile.toUri(), conf);
+ TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+ now, new Path(TEST_ROOT_DIR), false);
+ manager.releaseCache(firstCacheFile.toUri(), conf, now);
//in above code,localized a file of size 4K and then release the cache
// which will cause the cache be deleted when the limit goes out.
// The below code localize another cache which's designed to
//sweep away the first cache.
manager.getLocalCache(secondCacheFile.toUri(), conf,
- new Path(TEST_CACHE_BASE_DIR), null, false,
+ TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false,
System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
FileStatus[] dirStatuses = localfs.listStatus(
- new Path(TEST_CACHE_BASE_DIR));
+ new Path(ROOT_MAPRED_LOCAL_DIR.toString()));
assertTrue("DistributedCache failed deleting old" +
" cache when the cache store is full.",
dirStatuses.length == 1);
@@ -213,7 +210,8 @@
Path fileToCache = new Path("fakefile:///"
+ firstCacheFile.toUri().getPath());
Path result = manager.getLocalCache(fileToCache.toUri(), conf,
- new Path(TEST_CACHE_BASE_DIR), null, false, System.currentTimeMillis(),
+ TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+ System.currentTimeMillis(),
new Path(TEST_ROOT_DIR), false);
assertNotNull("DistributedCache cached file on non-default filesystem.",
result);
@@ -245,4 +243,98 @@
protected File pathToFile(Path p) {
return new File(p.toString());
}
+
+ public static class FakeFileSystem extends RawLocalFileSystem {
+ private long increment = 0;
+ public FakeFileSystem() {
+ super();
+ }
+
+ public FileStatus getFileStatus(Path p) throws IOException {
+ File f = pathToFile(p);
+ return new FileStatus(f.length(), f.isDirectory(), 1, 128,
+ f.lastModified() + increment, makeQualified(new Path(f.getPath())));
+ }
+
+ void advanceClock(long millis) {
+ increment += millis;
+ }
+ }
+
+ public void testFreshness() throws Exception {
+ Configuration myConf = new Configuration(conf);
+ myConf.set("fs.default.name", "refresh:///");
+ myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
+ TrackerDistributedCacheManager manager =
+ new TrackerDistributedCacheManager(myConf);
+ // ****** Imitate JobClient code
+ // Configures a task/job with both a regular file and a "classpath" file.
+ Configuration subConf = new Configuration(myConf);
+ DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
+ TrackerDistributedCacheManager.determineTimestamps(subConf);
+ // ****** End of imitating JobClient code
+
+ String userName = getJobOwnerName();
+
+ // ****** Imitate TaskRunner code.
+ TaskDistributedCacheManager handle =
+ manager.newTaskDistributedCacheManager(subConf);
+ assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
+ File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
+ handle.setup(localDirAllocator, workDir, TaskTracker
+ .getDistributedCacheDir(userName));
+ // ****** End of imitating TaskRunner code
+
+ Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
+ assertNotNull(null, localCacheFiles);
+ assertEquals(1, localCacheFiles.length);
+ Path cachedFirstFile = localCacheFiles[0];
+ assertFileLengthEquals(firstCacheFile, cachedFirstFile);
+ assertFalse("Paths should be different.",
+ firstCacheFile.equals(cachedFirstFile));
+ // release
+ handle.release();
+
+ // change the file timestamp
+ FileSystem fs = FileSystem.get(myConf);
+ ((FakeFileSystem)fs).advanceClock(1);
+
+ // running a task of the same job
+ Throwable th = null;
+ try {
+ handle.setup(localDirAllocator, workDir, TaskTracker
+ .getDistributedCacheDir(userName));
+ } catch (IOException ie) {
+ th = ie;
+ }
+ assertNotNull("Throwable is null", th);
+ assertTrue("Exception message does not match",
+ th.getMessage().contains("has changed on HDFS since job started"));
+ // release
+ handle.release();
+
+ // submit another job
+ Configuration subConf2 = new Configuration(myConf);
+ DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
+ TrackerDistributedCacheManager.determineTimestamps(subConf2);
+
+ handle =
+ manager.newTaskDistributedCacheManager(subConf2);
+ handle.setup(localDirAllocator, workDir, TaskTracker
+ .getDistributedCacheDir(userName));
+ Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
+ assertNotNull(null, localCacheFiles2);
+ assertEquals(1, localCacheFiles2.length);
+ Path cachedFirstFile2 = localCacheFiles2[0];
+ assertFileLengthEquals(firstCacheFile, cachedFirstFile2);
+ assertFalse("Paths should be different.",
+ firstCacheFile.equals(cachedFirstFile2));
+
+ // assert that two localizations point to different paths
+ assertFalse("two jobs with different timestamps did not localize" +
+ " in different paths", cachedFirstFile.equals(cachedFirstFile2));
+ // release
+ handle.release();
+ }
+
}