You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2011/07/21 03:59:15 UTC
svn commit: r1149004 - in /hadoop/common/trunk/mapreduce: ./
src/java/org/apache/hadoop/mapreduce/filecache/
src/test/mapred/org/apache/hadoop/mapreduce/filecache/
Author: cdouglas
Date: Thu Jul 21 01:59:14 2011
New Revision: 1149004
URL: http://svn.apache.org/viewvc?rev=1149004&view=rev
Log:
MAPREDUCE-2409. DistributedCache maps files and archives to the same path,
despite semantic incompatibility. Contributed by Siddharth Seth
Modified:
hadoop/common/trunk/mapreduce/CHANGES.txt
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1149004&r1=1149003&r2=1149004&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Thu Jul 21 01:59:14 2011
@@ -344,6 +344,9 @@ Trunk (unreleased changes)
MAPREDUCE-2710. Update JobSubmitter.printTokens(..) for HDFS-2161.
(szetszwo)
+ MAPREDUCE-2409. DistributedCache maps files and archives to the same path,
+ despite semantic incompatibility. (Siddharth Seth via cdouglas)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=1149004&r1=1149003&r2=1149004&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Thu Jul 21 01:59:14 2011
@@ -241,7 +241,7 @@ public class TaskDistributedCacheManager
for (CacheFile c : cacheFiles) {
if (c.getLocalized()) {
distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp,
- c.owner);
+ c.owner, CacheFile.FileType.ARCHIVE == c.type);
}
}
}
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=1149004&r1=1149003&r2=1149004&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Jul 21 01:59:14 2011
@@ -158,9 +158,9 @@ public class TrackerDistributedCacheMana
Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
throws IOException {
String key;
- key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic));
+ key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic),
+ isArchive);
CacheStatus lcacheStatus;
- Path localizedPath = null;
synchronized (cachedArchives) {
lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
@@ -187,18 +187,18 @@ public class TrackerDistributedCacheMana
FileSystem fs = FileSystem.get(cache, conf);
checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
lcacheStatus, fileStatus);
- localizedPath = localizeCache(conf, cache, confFileStamp,
+ localizeCache(conf, cache, confFileStamp,
lcacheStatus, isArchive, isPublic);
lcacheStatus.initComplete();
} else {
- localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+ checkCacheStatusValidity(conf, cache, confFileStamp,
lcacheStatus, fileStatus, isArchive);
}
createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
honorSymLinkConf);
}
initSuccessful = true;
- return localizedPath;
+ return lcacheStatus.localizedLoadPath;
} finally {
if (!initSuccessful) {
lcacheStatus.decRefCount();
@@ -217,8 +217,8 @@ public class TrackerDistributedCacheMana
* @throws IOException
*/
void releaseCache(URI cache, Configuration conf, long timeStamp,
- String owner) throws IOException {
- String key = getKey(cache, conf, timeStamp, owner);
+ String owner, boolean isArchive) throws IOException {
+ String key = getKey(cache, conf, timeStamp, owner, isArchive);
synchronized (cachedArchives) {
CacheStatus lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
@@ -236,8 +236,8 @@ public class TrackerDistributedCacheMana
* This method is called from unit tests.
*/
int getReferenceCount(URI cache, Configuration conf, long timeStamp,
- String owner) throws IOException {
- String key = getKey(cache, conf, timeStamp, owner);
+ String owner, boolean isArchive) throws IOException {
+ String key = getKey(cache, conf, timeStamp, owner, isArchive);
synchronized (cachedArchives) {
CacheStatus lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
@@ -315,9 +315,10 @@ public class TrackerDistributedCacheMana
return path;
}
- String getKey(URI cache, Configuration conf, long timeStamp, String user)
- throws IOException {
- return makeRelative(cache, conf) + String.valueOf(timeStamp) + user;
+ String getKey(URI cache, Configuration conf, long timeStamp, String user,
+ boolean isArchive) throws IOException {
+ return (isArchive ? "a" : "f") + "^" + makeRelative(cache, conf)
+ + String.valueOf(timeStamp) + user;
}
/**
@@ -341,12 +342,11 @@ public class TrackerDistributedCacheMana
* @return mtime of a given cache file on hdfs
* @throws IOException
*/
- static long getTimestamp(Configuration conf, URI cache)
- throws IOException {
+ long getTimestamp(Configuration conf, URI cache) throws IOException {
return getFileStatus(conf, cache).getModificationTime();
}
- private Path checkCacheStatusValidity(Configuration conf,
+ void checkCacheStatusValidity(Configuration conf,
URI cache, long confFileStamp,
CacheStatus cacheStatus,
FileStatus fileStatus,
@@ -362,7 +362,6 @@ public class TrackerDistributedCacheMana
LOG.info(String.format("Using existing cache of %s->%s",
cache.toString(), cacheStatus.localizedLoadPath));
- return cacheStatus.localizedLoadPath;
}
private void createSymlink(Configuration conf, URI cache,
@@ -472,7 +471,7 @@ public class TrackerDistributedCacheMana
}
// ensure that the file on hdfs hasn't been modified since the job started
- private long checkStampSinceJobStarted(Configuration conf, FileSystem fs,
+ long checkStampSinceJobStarted(Configuration conf, FileSystem fs,
URI cache, long confFileStamp,
CacheStatus lcacheStatus,
FileStatus fileStatus)
Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=1149004&r1=1149003&r2=1149004&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Jul 21 01:59:14 2011
@@ -51,11 +51,15 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.mortbay.log.Log;
+import org.mockito.Matchers;
+import static org.mockito.Mockito.*;
+
public class TestTrackerDistributedCacheManager extends TestCase {
protected String TEST_ROOT_DIR =
@@ -251,7 +255,7 @@ public class TestTrackerDistributedCache
handle.release();
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
- c.owner));
+ c.owner, false));
}
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -289,7 +293,7 @@ public class TestTrackerDistributedCache
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
try {
assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp,
- c.owner));
+ c.owner, false));
} catch (IOException ie) {
th = ie;
Log.info("Exception getting reference count for " + c.uri, ie);
@@ -609,15 +613,15 @@ public class TestTrackerDistributedCache
manager.releaseCache(thirdCacheFile.toUri(), conf2,
getFileStamp(thirdCacheFile),
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
manager.releaseCache(secondCacheFile.toUri(), conf2,
getFileStamp(secondCacheFile),
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
manager.releaseCache(firstCacheFile.toUri(), conf2,
getFileStamp(firstCacheFile),
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
// Getting the fourth cache will make the number of sub directories becomes
@@ -645,6 +649,47 @@ public class TestTrackerDistributedCache
manager.stopCleanupThread();
}
+ public void testSameNameFileArchiveCache() throws IOException,
+ URISyntaxException, InterruptedException {
+ if (!canRun()) {
+ return;
+ }
+ TrackerDistributedCacheManager manager =
+ spy(new TrackerDistributedCacheManager(conf, taskController));
+ URI rsrc = new URI("file://foo/bar/yak");
+ Path cacheDir = new Path("file:///localcache");
+ Path archivePath = new Path(cacheDir, "archive");
+ Path filePath = new Path(cacheDir, "file");
+ doReturn(archivePath).when(manager).localizeCache(eq(conf), eq(rsrc),
+ anyLong(), Matchers.<CacheStatus> anyObject(), eq(true), anyBoolean());
+ doReturn(filePath).when(manager).localizeCache(eq(conf), eq(rsrc),
+ anyLong(), Matchers.<CacheStatus> anyObject(), eq(false), anyBoolean());
+ // could fail, but check match instead
+ doNothing().when(manager).checkCacheStatusValidity(
+ Matchers.<Configuration> anyObject(), eq(rsrc), anyLong(),
+ Matchers.<CacheStatus> anyObject(), Matchers.<FileStatus> anyObject(),
+ anyBoolean());
+ // localizeCache initializes mtime of cached rsrc; set to uninitialized val
+ doReturn(-1L).when(manager).checkStampSinceJobStarted(
+ Matchers.<Configuration> anyObject(),
+ Matchers.<FileSystem> anyObject(), eq(rsrc), anyLong(),
+ Matchers.<CacheStatus> anyObject(), Matchers.<FileStatus> anyObject());
+ doReturn(-1L).when(manager).getTimestamp(
+ Matchers.<Configuration> anyObject(), eq(rsrc));
+ FileStatus rsrcStatus = mock(FileStatus.class);
+ when(rsrcStatus.getLen()).thenReturn(4344L);
+
+ Path localizedPathForFile =
+ manager.getLocalCache(rsrc, conf, "sub", rsrcStatus, false, 20L,
+ new Path("file:///tmp"), false, true);
+ Path localizedPathForArchive =
+ manager.getLocalCache(rsrc, conf, "sub", rsrcStatus, true, 20L,
+ new Path("file:///tmp"), false, true);
+ assertNotSame("File and Archive resolve to the same path: "
+ + localizedPathForFile + ". Should differ.", localizedPathForFile,
+ localizedPathForArchive);
+ }
+
/** test delete cache */
public void testDeleteCache() throws Exception {
if (!canRun()) {
@@ -676,7 +721,7 @@ public class TestTrackerDistributedCache
getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
manager.releaseCache(firstCacheFile.toUri(), conf2,
getFileStamp(firstCacheFile),
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
//in above code,localized a file of size 4K and then release the cache
// which will cause the cache be deleted when the limit goes out.
// The below code localize another cache which's designed to
@@ -702,7 +747,7 @@ public class TestTrackerDistributedCache
// Release the third cache so that it can be deleted while sweeping
manager.releaseCache(thirdCacheFile.toUri(), conf2,
getFileStamp(thirdCacheFile),
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false), false);
// Getting the fourth cache will make the number of sub directories becomes
// 3 which is greater than 2. So the released cache will be deleted.
manager.getLocalCache(fourthCacheFile.toUri(), conf2,