You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:56:31 UTC
svn commit: r1079211 [10/11] - in /hadoop/mapreduce/branches/yahoo-merge: ./
src/c++/task-controller/ src/c++/task-controller/impl/
src/c++/task-controller/test/ src/c++/task-controller/tests/
src/contrib/fairscheduler/designdoc/ src/contrib/streaming/...
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Mar 8 05:56:27 2011
@@ -26,9 +26,6 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import java.net.URI;
@@ -134,175 +131,6 @@ import java.net.URI;
@Deprecated
@InterfaceAudience.Private
public class DistributedCache {
- /**
- * Get the locally cached file or archive; it could either be
- * previously cached (and valid) or copy it from the {@link FileSystem} now.
- *
- * @param cache the cache to be localized, this should be specified as
- * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
- * @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the files/archives
- * @param fileStatus The file status on the dfs.
- * @param isArchive if the cache is an archive or a file. In case it is an
- * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
- * be unzipped/unjarred/untarred automatically
- * and the directory where the archive is unzipped/unjarred/untarred is
- * returned as the Path.
- * In case of a file, the path to the file is returned
- * @param confFileStamp this is the hdfs file modification timestamp to verify that the
- * file to be cached hasn't changed since the job started
- * @param currentWorkDir this is the directory where you would want to create symlinks
- * for the locally cached files/archives
- * @return the path to directory where the archives are unjarred in case of archives,
- * the path to the file where the file is copied locally
- * @throws IOException
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, FileStatus fileStatus,
- boolean isArchive, long confFileStamp,
- Path currentWorkDir)
- throws IOException {
- return getLocalCache(cache, conf, baseDir, fileStatus, isArchive,
- confFileStamp, currentWorkDir, true);
- }
-
- /**
- * Get the locally cached file or archive; it could either be
- * previously cached (and valid) or copy it from the {@link FileSystem} now.
- *
- * @param cache the cache to be localized, this should be specified as
- * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
- * @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the files/archives
- * @param fileStatus The file status on the dfs.
- * @param isArchive if the cache is an archive or a file. In case it is an
- * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
- * be unzipped/unjarred/untarred automatically
- * and the directory where the archive is unzipped/unjarred/untarred is
- * returned as the Path.
- * In case of a file, the path to the file is returned
- * @param confFileStamp this is the hdfs file modification timestamp to verify that the
- * file to be cached hasn't changed since the job started
- * @param currentWorkDir this is the directory where you would want to create symlinks
- * for the locally cached files/archives
- * @param honorSymLinkConf if this is false, then the symlinks are not
- * created even if conf says so (this is required for an optimization in task
- * launches
- * @return the path to directory where the archives are unjarred in case of archives,
- * the path to the file where the file is copied locally
- * @throws IOException
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, FileStatus fileStatus,
- boolean isArchive, long confFileStamp,
- Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
-
- return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
- confFileStamp, currentWorkDir, honorSymLinkConf, false);
- }
-
- /**
- * Get the locally cached file or archive; it could either be
- * previously cached (and valid) or copy it from the {@link FileSystem} now.
- *
- * @param cache the cache to be localized, this should be specified as
- * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
- * @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the files/archives
- * @param isArchive if the cache is an archive or a file. In case it is an
- * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
- * be unzipped/unjarred/untarred automatically
- * and the directory where the archive is unzipped/unjarred/untarred
- * is returned as the Path.
- * In case of a file, the path to the file is returned
- * @param confFileStamp this is the hdfs file modification timestamp to verify that the
- * file to be cached hasn't changed since the job started
- * @param currentWorkDir this is the directory where you would want to create symlinks
- * for the locally cached files/archives
- * @return the path to directory where the archives are unjarred in case of archives,
- * the path to the file where the file is copied locally
- * @throws IOException
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, boolean isArchive,
- long confFileStamp, Path currentWorkDir)
- throws IOException {
- return getLocalCache(cache, conf,
- baseDir, null, isArchive,
- confFileStamp, currentWorkDir);
- }
-
- /**
- * This is the opposite of getlocalcache. When you are done with
- * using the cache, you need to release the cache
- * @param cache The cache URI to be released
- * @param conf configuration which contains the filesystem the cache
- * is contained in.
- * @throws IOException
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static void releaseCache(URI cache, Configuration conf)
- throws IOException {
- // find the timestamp of the uri
- URI[] archives = DistributedCache.getCacheArchives(conf);
- URI[] files = DistributedCache.getCacheFiles(conf);
- String[] archivesTimestamps =
- DistributedCache.getArchiveTimestamps(conf);
- String[] filesTimestamps =
- DistributedCache.getFileTimestamps(conf);
- String timestamp = null;
- if (archives != null) {
- for (int i = 0; i < archives.length; i++) {
- if (archives[i].equals(cache)) {
- timestamp = archivesTimestamps[i];
- break;
- }
- }
- }
- if (timestamp == null && files != null) {
- for (int i = 0; i < files.length; i++) {
- if (files[i].equals(cache)) {
- timestamp = filesTimestamps[i];
- break;
- }
- }
- }
- if (timestamp == null) {
- throw new IOException("TimeStamp of the uri couldnot be found");
- }
- new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .releaseCache(cache, conf, Long.parseLong(timestamp),
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
- }
-
- /**
- * Returns the relative path of the dir this cache will be localized in
- * relative path that this cache will be localized in. For
- * hdfs://hostname:port/absolute_path -- the relative path is
- * hostname/absolute path -- if it is just /absolute_path -- then the
- * relative path is hostname of DFS this mapred cluster is running
- * on/absolute_path
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
- */
- @Deprecated
- public static String makeRelative(URI cache, Configuration conf)
- throws IOException {
- return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .makeRelative(cache, conf);
- }
/**
* Returns mtime of a given cache file on hdfs.
@@ -361,11 +189,22 @@ public class DistributedCache {
conf.set(MRJobConfig.CACHE_FILES, sfiles);
}
+ private static Path[] parsePaths(String[] strs) {
+ if (strs == null) {
+ return null;
+ }
+ Path[] result = new Path[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = new Path(strs[i]);
+ }
+ return result;
+ }
+
/**
* Get cache archives set in the Configuration. Used by
* internal DistributedCache and MapReduce code.
* @param conf The configuration which contains the archives
- * @return A URI array of the caches set in the Configuration
+ * @return An array of the caches set in the Configuration
* @throws IOException
* @deprecated Use {@link JobContext#getCacheArchives()} instead
*/
@@ -378,7 +217,7 @@ public class DistributedCache {
* Get cache files set in the Configuration. Used by internal
* DistributedCache and MapReduce code.
* @param conf The configuration which contains the files
- * @return A URI array of the files set in the Configuration
+ * @return Am array of the files set in the Configuration
* @throws IOException
* @deprecated Use {@link JobContext#getCacheFiles()} instead
*/
@@ -417,30 +256,46 @@ public class DistributedCache {
}
/**
+ * Parse a list of strings into longs.
+ * @param strs the list of strings to parse
+ * @return a list of longs that were parsed. same length as strs.
+ */
+ private static long[] parseTimestamps(String[] strs) {
+ if (strs == null) {
+ return null;
+ }
+ long[] result = new long[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = Long.parseLong(strs[i]);
+ }
+ return result;
+ }
+
+ /**
* Get the timestamps of the archives. Used by internal
* DistributedCache and MapReduce code.
* @param conf The configuration which stored the timestamps
- * @return a string array of timestamps
+ * @return a long array of timestamps
* @throws IOException
* @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
*/
@Deprecated
- public static String[] getArchiveTimestamps(Configuration conf) {
- return conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
+ public static long[] getArchiveTimestamps(Configuration conf) {
+ return
+ parseTimestamps(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
}
-
/**
* Get the timestamps of the files. Used by internal
* DistributedCache and MapReduce code.
* @param conf The configuration which stored the timestamps
- * @return a string array of timestamps
+ * @return a long array of timestamps
* @throws IOException
* @deprecated Use {@link JobContext#getFileTimestamps()} instead
*/
@Deprecated
- public static String[] getFileTimestamps(Configuration conf) {
- return conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+ public static long[] getFileTimestamps(Configuration conf) {
+ return parseTimestamps(conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS));
}
/**
@@ -511,8 +366,8 @@ public class DistributedCache {
@Deprecated
public static void addCacheArchive(URI uri, Configuration conf) {
String archives = conf.get(MRJobConfig.CACHE_ARCHIVES);
- conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
- : archives + "," + uri.toString());
+ conf.set(MRJobConfig.CACHE_ARCHIVES,
+ archives == null ? uri.toString() : archives + "," + uri.toString());
}
/**
@@ -525,8 +380,32 @@ public class DistributedCache {
@Deprecated
public static void addCacheFile(URI uri, Configuration conf) {
String files = conf.get(MRJobConfig.CACHE_FILES);
- conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() : files + ","
- + uri.toString());
+ conf.set(MRJobConfig.CACHE_FILES,
+ files == null ? uri.toString() : files + "," + uri.toString());
+ }
+
+ /**
+ * Add a archive that has been localized to the conf. Used
+ * by internal DistributedCache code.
+ * @param conf The conf to modify to contain the localized caches
+ * @param str a comma separated list of local archives
+ */
+ public static void addLocalArchives(Configuration conf, String str) {
+ String archives = conf.get(MRJobConfig.CACHE_LOCALARCHIVES);
+ conf.set(MRJobConfig.CACHE_LOCALARCHIVES,
+ archives == null ? str : archives + "," + str);
+ }
+
+ /**
+ * Add a file that has been localized to the conf.. Used
+ * by internal DistributedCache code.
+ * @param conf The conf to modify to contain the localized caches
+ * @param str a comma separated list of local files
+ */
+ public static void addLocalFiles(Configuration conf, String str) {
+ String files = conf.get(MRJobConfig.CACHE_LOCALFILES);
+ conf.set(MRJobConfig.CACHE_LOCALFILES,
+ files == null ? str : files + "," + str);
}
/**
@@ -541,8 +420,8 @@ public class DistributedCache {
public static void addFileToClassPath(Path file, Configuration conf)
throws IOException {
String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
- conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
- : classpath + "," + file.toString());
+ conf.set(MRJobConfig.CLASSPATH_FILES,
+ classpath == null ? file.toString() : classpath + "," + file);
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(file).toUri();
@@ -654,17 +533,4 @@ public class DistributedCache {
public static boolean checkURIs(URI[] uriFiles, URI[] uriArchives){
return TrackerDistributedCacheManager.checkURIs(uriFiles, uriArchives);
}
-
- /**
- * Clear the entire contents of the cache and delete the backing files. This
- * should only be used when the server is reinitializing, because the users
- * are going to lose their files.
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static void purgeCache(Configuration conf) throws IOException {
- new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .purgeCache();
- }
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Mar 8 05:56:27 2011
@@ -30,8 +30,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -48,10 +51,9 @@ import org.apache.hadoop.classification.
@InterfaceAudience.Private
public class TaskDistributedCacheManager {
private final TrackerDistributedCacheManager distributedCacheManager;
- private final Configuration taskConf;
private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
private final List<String> classPaths = new ArrayList<String>();
-
+
private boolean setupCalled = false;
/**
@@ -75,9 +77,9 @@ public class TaskDistributedCacheManager
boolean localized = false;
/** The owner of the localized file. Relevant only on the tasktrackers */
final String owner;
-
- private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp,
- boolean classPath) throws IOException {
+ private CacheStatus status;
+ CacheFile(URI uri, FileType type, boolean isPublic, long timestamp,
+ boolean classPath) throws IOException {
this.uri = uri;
this.type = type;
this.isPublic = isPublic;
@@ -88,12 +90,28 @@ public class TaskDistributedCacheManager
}
/**
+ * Set the status for this cache file.
+ * @param status
+ */
+ public void setStatus(CacheStatus status) {
+ this.status = status;
+ }
+
+ /**
+ * Get the status for this cache file.
+ * @return the status object
+ */
+ public CacheStatus getStatus() {
+ return status;
+ }
+
+ /**
* Converts the scheme used by DistributedCache to serialize what files to
* cache in the configuration into CacheFile objects that represent those
* files.
*/
private static List<CacheFile> makeCacheFiles(URI[] uris,
- String[] timestamps, String cacheVisibilities[], Path[] paths,
+ long[] timestamps, boolean cacheVisibilities[], Path[] paths,
FileType type) throws IOException {
List<CacheFile> ret = new ArrayList<CacheFile>();
if (uris != null) {
@@ -109,9 +127,8 @@ public class TaskDistributedCacheManager
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
boolean isClassPath = (null != classPaths.get(u.getPath()));
- long t = Long.parseLong(timestamps[i]);
- ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
- t, isClassPath));
+ ret.add(new CacheFile(u, type, cacheVisibilities[i],
+ timestamps[i], isClassPath));
}
}
return ret;
@@ -130,7 +147,6 @@ public class TaskDistributedCacheManager
TrackerDistributedCacheManager distributedCacheManager,
Configuration taskConf) throws IOException {
this.distributedCacheManager = distributedCacheManager;
- this.taskConf = taskConf;
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
@@ -147,36 +163,42 @@ public class TaskDistributedCacheManager
}
/**
- * Retrieve files into the local cache and updates the task configuration
- * (which has been passed in via the constructor).
+ * Retrieve public distributed cache files into the local cache and updates
+ * the task configuration (which has been passed in via the constructor).
+ * The private distributed cache is just looked at and the paths where the
+ * files/archives should go to is decided here. The actual localization is
+ * done by {@link JobLocalizer}.
*
* It is the caller's responsibility to re-write the task configuration XML
* file, if necessary.
*/
- public void setup(LocalDirAllocator lDirAlloc, File workDir,
- String privateCacheSubdir, String publicCacheSubDir) throws IOException {
+ public void setupCache(Configuration taskConf, String publicCacheSubdir,
+ String privateCacheSubdir) throws IOException {
setupCalled = true;
- if (cacheFiles.isEmpty()) {
- return;
- }
-
ArrayList<Path> localArchives = new ArrayList<Path>();
ArrayList<Path> localFiles = new ArrayList<Path>();
- Path workdirPath = new Path(workDir.getAbsolutePath());
for (CacheFile cacheFile : cacheFiles) {
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
- String cacheSubdir = publicCacheSubDir;
- if (!cacheFile.isPublic) {
- cacheSubdir = privateCacheSubdir;
- }
- Path p = distributedCacheManager.getLocalCache(uri, taskConf,
- cacheSubdir, fileStatus,
- cacheFile.type == CacheFile.FileType.ARCHIVE,
- cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
+ Path p;
+ try {
+ if (cacheFile.isPublic) {
+ p = distributedCacheManager.getLocalCache(uri, taskConf,
+ publicCacheSubdir, fileStatus,
+ cacheFile.type == CacheFile.FileType.ARCHIVE,
+ cacheFile.timestamp, cacheFile.isPublic, cacheFile);
+ } else {
+ p = distributedCacheManager.getLocalCache(uri, taskConf,
+ privateCacheSubdir, fileStatus,
+ cacheFile.type == CacheFile.FileType.ARCHIVE,
+ cacheFile.timestamp, cacheFile.isPublic, cacheFile);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted localizing cache file", e);
+ }
cacheFile.setLocalized(true);
if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
@@ -191,10 +213,14 @@ public class TaskDistributedCacheManager
// Update the configuration object with localized data.
if (!localArchives.isEmpty()) {
+ // TODO verify
+// DistributedCache.addLocalArchives(taskConf,
TrackerDistributedCacheManager.setLocalArchives(taskConf,
stringifyPathList(localArchives));
}
if (!localFiles.isEmpty()) {
+ // TODO verify
+// DistributedCache.addLocalFiles(taskConf, stringifyPathList(localFiles));
TrackerDistributedCacheManager.setLocalFiles(taskConf,
stringifyPathList(localFiles));
}
@@ -239,9 +265,18 @@ public class TaskDistributedCacheManager
*/
public void release() throws IOException {
for (CacheFile c : cacheFiles) {
- if (c.getLocalized()) {
- distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp,
- c.owner);
+ if (c.getLocalized() && c.status != null) {
+ distributedCacheManager.releaseCache(c.status);
+ }
+ }
+ }
+
+ public void setSizes(long[] sizes) throws IOException {
+ int i = 0;
+ for (CacheFile c: cacheFiles) {
+ if (!c.isPublic && c.type == CacheFile.FileType.ARCHIVE &&
+ c.status != null) {
+ distributedCacheManager.setSize(c.status, sizes[i++]);
}
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Mar 8 05:56:27 2011
@@ -20,7 +20,11 @@ package org.apache.hadoop.mapreduce.file
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.text.DateFormat;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -31,13 +35,9 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskController;
-import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -46,10 +46,16 @@ import org.apache.hadoop.fs.LocalFileSys
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager.CacheFile;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.RunJar;
-import org.apache.hadoop.classification.InterfaceAudience;
/**
* Manages a single machine's instance of a cross-job
@@ -63,6 +69,11 @@ public class TrackerDistributedCacheMana
// cacheID to cacheStatus mapping
private TreeMap<String, CacheStatus> cachedArchives =
new TreeMap<String, CacheStatus>();
+ private Map<JobID, TaskDistributedCacheManager> jobArchives =
+ Collections.synchronizedMap(
+ new HashMap<JobID, TaskDistributedCacheManager>());
+ private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
+ FsPermission.createImmutable((short) 0755);
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
@@ -76,24 +87,20 @@ public class TrackerDistributedCacheMana
private final LocalFileSystem localFs;
private LocalDirAllocator lDirAllocator;
-
- private TaskController taskController;
-
+
private Configuration trackerConf;
- private Random random = new Random();
+ private static final Random random = new Random();
private MRAsyncDiskService asyncDiskService;
BaseDirManager baseDirManager = new BaseDirManager();
CleanupThread cleanupThread;
- public TrackerDistributedCacheManager(Configuration conf,
- TaskController taskController) throws IOException {
+ public TrackerDistributedCacheManager(Configuration conf) throws IOException {
this.localFs = FileSystem.getLocal(conf);
this.trackerConf = conf;
this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
- this.taskController = taskController;
// setting the cache size to a default of 10GB
this.allowedCacheSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE,
DEFAULT_CACHE_SIZE);
@@ -111,7 +118,7 @@ public class TrackerDistributedCacheMana
public TrackerDistributedCacheManager(Configuration conf,
TaskController taskController, MRAsyncDiskService asyncDiskService)
throws IOException {
- this(conf, taskController);
+ this(conf);
this.asyncDiskService = asyncDiskService;
}
@@ -145,15 +152,15 @@ public class TrackerDistributedCacheMana
* archives, the path to the file where the file is copied locally
* @throws IOException
*/
- Path getLocalCache(URI cache, Configuration conf,
- String subDir, FileStatus fileStatus,
- boolean isArchive, long confFileStamp,
- Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
- throws IOException {
- String key;
- key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic));
+ Path getLocalCache(URI cache, Configuration conf, String subDir,
+ FileStatus fileStatus, boolean isArchive, long confFileStamp,
+ boolean isPublic, CacheFile file)
+ throws IOException, InterruptedException {
+ String user = getLocalizedCacheOwner(isPublic);
+ String key = getKey(cache, conf, confFileStamp, user);
CacheStatus lcacheStatus;
Path localizedPath = null;
+ Path localPath = null;
synchronized (cachedArchives) {
lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
@@ -161,44 +168,59 @@ public class TrackerDistributedCacheMana
String uniqueString = String.valueOf(random.nextLong());
String cachePath = new Path (subDir,
new Path(uniqueString, makeRelative(cache, conf))).toString();
- Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
- fileStatus.getLen(), trackerConf);
- lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
- cachePath, "")), localPath, new Path(subDir), uniqueString);
+ localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), trackerConf, isPublic);
+ lcacheStatus =
+ new CacheStatus(new Path(localPath.toString().replace(cachePath, "")),
+ localPath, new Path(subDir), uniqueString,
+ isPublic ? null : user);
cachedArchives.put(key, lcacheStatus);
}
- //mark the cache for use.
- lcacheStatus.refcount++;
+ //mark the cache for use.
+ file.setStatus(lcacheStatus);
+ synchronized (lcacheStatus) {
+ lcacheStatus.refcount++;
+ }
}
- boolean initSuccessful = false;
try {
// do the localization, after releasing the global lock
synchronized (lcacheStatus) {
if (!lcacheStatus.isInited()) {
- FileSystem fs = FileSystem.get(cache, conf);
- checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
- lcacheStatus, fileStatus);
- localizedPath = localizeCache(conf, cache, confFileStamp,
- lcacheStatus, isArchive, isPublic);
+ if (isPublic) {
+ // TODO verify covered
+ //checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
+ // lcacheStatus, fileStatus);
+ localizedPath = localizePublicCacheObject(conf, cache,
+ confFileStamp, lcacheStatus, fileStatus, isArchive);
+ } else {
+ localizedPath = localPath;
+ if (!isArchive) {
+ //for private archives, the lengths come over RPC from the
+ //JobLocalizer since the JobLocalizer is the one who expands
+ //archives and gets the total length
+ lcacheStatus.size = fileStatus.getLen();
+
+ // Increase the size and sub directory count of the cache
+ // from baseDirSize and baseDirNumberSubDir.
+ baseDirManager.addCacheUpdate(lcacheStatus);
+ }
+ }
lcacheStatus.initComplete();
} else {
localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
lcacheStatus, fileStatus, isArchive);
}
- createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
- honorSymLinkConf);
}
- initSuccessful = true;
- return localizedPath;
- } finally {
- if (!initSuccessful) {
- synchronized (cachedArchives) {
- lcacheStatus.refcount--;
- }
+ } catch (IOException ie) {
+ synchronized (lcacheStatus) {
+ // release this cache
+ lcacheStatus.refcount -= 1;
+ throw ie;
}
}
+ return localizedPath;
}
/**
@@ -211,37 +233,30 @@ public class TrackerDistributedCacheMana
* is contained in.
* @throws IOException
*/
- void releaseCache(URI cache, Configuration conf, long timeStamp,
- String owner) throws IOException {
- String key = getKey(cache, conf, timeStamp, owner);
- synchronized (cachedArchives) {
- CacheStatus lcacheStatus = cachedArchives.get(key);
- if (lcacheStatus == null) {
- LOG.warn("Cannot find localized cache: " + cache +
- " (key: " + key + ") in releaseCache!");
- return;
+ void releaseCache(CacheStatus status) throws IOException {
+ synchronized (status) {
+ status.refcount--;
+ }
+ }
+
+ void setSize(CacheStatus status, long size) throws IOException {
+ if (size != 0) {
+ synchronized (status) {
+ status.size = size;
+ baseDirManager.addCacheUpdate(status);
}
-
- // decrement ref count
- lcacheStatus.refcount--;
}
}
/*
* This method is called from unit tests.
*/
- int getReferenceCount(URI cache, Configuration conf, long timeStamp,
- String owner) throws IOException {
- String key = getKey(cache, conf, timeStamp, owner);
- synchronized (cachedArchives) {
- CacheStatus lcacheStatus = cachedArchives.get(key);
- if (lcacheStatus == null) {
- throw new IOException("Cannot find localized cache: " + cache);
- }
- return lcacheStatus.refcount;
+ int getReferenceCount(CacheStatus status) throws IOException {
+ synchronized (status) {
+ return status.refcount;
}
}
-
+
/**
* Get the user who should "own" the localized distributed cache file.
* If the cache is public, the tasktracker user is the owner. If private,
@@ -266,6 +281,7 @@ public class TrackerDistributedCacheMana
*/
private static void deleteLocalPath(MRAsyncDiskService asyncDiskService,
LocalFileSystem fs, Path path) throws IOException {
+ // TODO need to make asyncDiskService use taskController
boolean deleted = false;
if (asyncDiskService != null) {
// Try to delete using asyncDiskService
@@ -408,53 +424,72 @@ public class TrackerDistributedCacheMana
return cacheStatus.localizedLoadPath;
}
- private void createSymlink(Configuration conf, URI cache,
- CacheStatus cacheStatus, boolean isArchive,
- Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
- boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
- if(cache.getFragment() == null) {
- doSymlink = false;
- }
- String link =
- currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
- File flink = new File(link);
- if (doSymlink){
- if (!flink.exists()) {
- FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
- }
- }
+ private static Path createRandomPath(Path base) throws IOException {
+ return new Path(base.toString() + "-work-" + random.nextLong());
}
-
- // the method which actually copies the caches locally and unjars/unzips them
- // and does chmod for the files
- Path localizeCache(Configuration conf,
- URI cache, long confFileStamp,
- CacheStatus cacheStatus,
- boolean isArchive, boolean isPublic)
- throws IOException {
- FileSystem fs = FileSystem.get(cache, conf);
+
+ /**
+ * Download a given path to the local file system.
+ * @param conf the job's configuration
+ * @param source the source to copy from
+ * @param destination where to copy the file. must be local fs
+ * @param desiredTimestamp the required modification timestamp of the source
+ * @param isArchive is this an archive that should be expanded
+ * @param permission the desired permissions of the file.
+ * @return for archives, the number of bytes in the unpacked directory
+ * @throws IOException
+ */
+ public static long downloadCacheObject(Configuration conf,
+ URI source,
+ Path destination,
+ long desiredTimestamp,
+ boolean isArchive,
+ FsPermission permission
+ ) throws IOException,
+ InterruptedException {
+ FileSystem sourceFs = FileSystem.get(source, conf);
FileSystem localFs = FileSystem.getLocal(conf);
+
+ Path sourcePath = new Path(source.getPath());
+ long modifiedTime =
+ sourceFs.getFileStatus(sourcePath).getModificationTime();
+ if (modifiedTime != desiredTimestamp) {
+ DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT,
+ DateFormat.SHORT);
+ throw new IOException("The distributed cache object " + source +
+ " changed during the job from " +
+ df.format(new Date(desiredTimestamp)) + " to " +
+ df.format(new Date(modifiedTime)));
+ }
+
Path parchive = null;
if (isArchive) {
- parchive = new Path(cacheStatus.localizedLoadPath,
- new Path(cacheStatus.localizedLoadPath.getName()));
+ parchive = new Path(destination, destination.getName());
} else {
- parchive = cacheStatus.localizedLoadPath;
+ parchive = destination;
}
-
- if (!localFs.mkdirs(parchive.getParent())) {
- throw new IOException("Mkdirs failed to create directory " +
- cacheStatus.localizedLoadPath.toString());
- }
-
- String cacheId = cache.getPath();
- fs.copyToLocalFile(new Path(cacheId), parchive);
+ // if the file already exists, we are done
+ if (localFs.exists(parchive)) {
+ return 0;
+ }
+ // the final directory for the object
+ Path finalDir = parchive.getParent();
+ // the work directory for the object
+ Path workDir = createRandomPath(finalDir);
+ LOG.info("Creating " + destination.getName() + " in " + workDir + " with " +
+ permission);
+ if (!localFs.mkdirs(workDir, permission)) {
+ throw new IOException("Mkdirs failed to create directory " + workDir);
+ }
+ Path workFile = new Path(workDir, parchive.getName());
+ sourceFs.copyToLocalFile(sourcePath, workFile);
+ localFs.setPermission(workFile, permission);
if (isArchive) {
- String tmpArchive = parchive.toString().toLowerCase();
- File srcFile = new File(parchive.toString());
- File destDir = new File(parchive.getParent().toString());
+ String tmpArchive = workFile.getName().toLowerCase();
+ File srcFile = new File(workFile.toString());
+ File destDir = new File(workDir.toString());
LOG.info(String.format("Extracting %s to %s",
- srcFile.toString(), destDir.toString()));
+ srcFile.toString(), destDir.toString()));
if (tmpArchive.endsWith(".jar")) {
RunJar.unJar(srcFile, destDir);
} else if (tmpArchive.endsWith(".zip")) {
@@ -468,47 +503,48 @@ public class TrackerDistributedCacheMana
// else will not do anyhting
// and copy the file into the dir as it is
}
+ FileUtil.chmod(destDir.toString(), "ugo+rx", true);
+ }
+ // promote the output to the final location
+ if (!localFs.rename(workDir, finalDir)) {
+ localFs.delete(workDir, true);
+ if (!localFs.exists(finalDir)) {
+ throw new IOException("Failed to promote distributed cache object " +
+ workDir + " to " + finalDir);
+ }
+ // someone else promoted first
+ return 0;
}
+ LOG.info(String.format("Cached %s as %s",
+ source.toString(), destination.toString()));
long cacheSize =
FileUtil.getDU(new File(parchive.getParent().toString()));
- cacheStatus.size = cacheSize;
+ return cacheSize;
+ }
+
+ //the method which actually copies the caches locally and unjars/unzips them
+ // and does chmod for the files
+ Path localizePublicCacheObject(Configuration conf,
+ URI cache, long confFileStamp,
+ CacheStatus cacheStatus,
+ FileStatus fileStatus,
+ boolean isArchive
+ ) throws IOException, InterruptedException {
+ long size = downloadCacheObject(conf, cache, cacheStatus.localizedLoadPath,
+ confFileStamp, isArchive,
+ PUBLIC_CACHE_OBJECT_PERM);
+ cacheStatus.size = size;
+
// Increase the size and sub directory count of the cache
// from baseDirSize and baseDirNumberSubDir.
baseDirManager.addCacheUpdate(cacheStatus);
- // set proper permissions for the localized directory
- setPermissions(conf, cacheStatus, isPublic);
-
- // update cacheStatus to reflect the newly cached file
- cacheStatus.mtime = getTimestamp(conf, cache);
-
LOG.info(String.format("Cached %s as %s",
cache.toString(), cacheStatus.localizedLoadPath));
return cacheStatus.localizedLoadPath;
}
- private void setPermissions(Configuration conf, CacheStatus cacheStatus,
- boolean isPublic) throws IOException {
- if (isPublic) {
- Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
- LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
- try {
- FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true);
- } catch (InterruptedException e) {
- LOG.warn("Exception in chmod" + e.toString());
- throw new IOException(e);
- }
- } else {
- // invoke taskcontroller to set permissions
- DistributedCacheFileContext context = new DistributedCacheFileContext(
- conf.get(MRJobConfig.USER_NAME), new File(cacheStatus.localizedBaseDir
- .toString()), cacheStatus.localizedBaseDir,
- cacheStatus.uniqueString);
- taskController.initializeDistributedCacheFile(context);
- }
- }
-
private static boolean isTarFile(String filename) {
return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
filename.endsWith(".tar"));
@@ -542,12 +578,18 @@ public class TrackerDistributedCacheMana
CacheStatus lcacheStatus,
FileStatus fileStatus)
throws IOException {
- long dfsFileStamp = checkStampSinceJobStarted(conf, fs, cache,
- confFileStamp, lcacheStatus, fileStatus);
- if (dfsFileStamp != lcacheStatus.mtime) {
- return false;
+ long dfsFileStamp;
+ if (fileStatus != null) {
+ dfsFileStamp = fileStatus.getModificationTime();
+ } else {
+ dfsFileStamp = getTimestamp(conf, cache);
}
+ if (dfsFileStamp != confFileStamp) {
+ LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+ throw new IOException("File: " + cache +
+ " has changed on HDFS since job started");
+ }
return true;
}
@@ -596,7 +638,6 @@ public class TrackerDistributedCacheMana
// individual cacheStatus lock.
//
long size; //the size of this cache.
- long mtime; // the cache-file modification time
boolean inited = false; // is it initialized ?
//
@@ -607,19 +648,21 @@ public class TrackerDistributedCacheMana
final Path subDir;
// unique string used in the construction of local load path
final String uniqueString;
+ // The user that owns the cache entry or null if it is public
+ final String user;
// the local load path of this cache
final Path localizedLoadPath;
//the base dir where the cache lies
final Path localizedBaseDir;
public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
- String uniqueString) {
+ String uniqueString, String user) {
super();
this.localizedLoadPath = localLoadPath;
this.refcount = 0;
- this.mtime = -1;
this.localizedBaseDir = baseDir;
this.size = 0;
+ this.user = user;
this.subDir = subDir;
this.uniqueString = uniqueString;
}
@@ -662,8 +705,22 @@ public class TrackerDistributedCacheMana
}
public TaskDistributedCacheManager newTaskDistributedCacheManager(
- Configuration taskConf) throws IOException {
- return new TaskDistributedCacheManager(this, taskConf);
+ JobID jobId, Configuration taskConf) throws IOException {
+ TaskDistributedCacheManager result =
+ new TaskDistributedCacheManager(this, taskConf);
+ jobArchives.put(jobId, result);
+ return result;
+ }
+
+ public void deleteTaskDistributedCacheManager(JobID jobId) {
+ jobArchives.remove(jobId);
+ }
+
+ public void setArchiveSizes(JobID jobId, long[] sizes) throws IOException {
+ TaskDistributedCacheManager mgr = jobArchives.get(jobId);
+ if (mgr != null) {
+ mgr.setSizes(sizes);
+ }
}
/**
@@ -776,6 +833,17 @@ public class TrackerDistributedCacheMana
}
}
+ private static boolean[] parseBooleans(String[] strs) {
+ if (null == strs) {
+ return null;
+ }
+ boolean[] result = new boolean[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = Boolean.parseBoolean(strs[i]);
+ }
+ return result;
+ }
+
/**
* Get the booleans on whether the files are public or not. Used by
* internal DistributedCache and MapReduce code.
@@ -783,8 +851,8 @@ public class TrackerDistributedCacheMana
* @return a string array of booleans
* @throws IOException
*/
- static String[] getFileVisibilities(Configuration conf) {
- return conf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+ public static boolean[] getFileVisibilities(Configuration conf) {
+ return parseBooleans(conf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES));
}
/**
@@ -793,8 +861,8 @@ public class TrackerDistributedCacheMana
* @param conf The configuration which stored the timestamps
* @return a string array of booleans
*/
- static String[] getArchiveVisibilities(Configuration conf) {
- return conf.getStrings(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES);
+ public static boolean[] getArchiveVisibilities(Configuration conf) {
+ return parseBooleans(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES));
}
/**
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Tue Mar 8 05:56:27 2011
@@ -131,7 +131,7 @@ class ChainMapContextImpl<KEYIN, VALUEIN
}
@Override
- public String[] getArchiveTimestamps() {
+ public long[] getArchiveTimestamps() {
return base.getArchiveTimestamps();
}
@@ -162,7 +162,7 @@ class ChainMapContextImpl<KEYIN, VALUEIN
}
@Override
- public String[] getFileTimestamps() {
+ public long[] getFileTimestamps() {
return base.getFileTimestamps();
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Tue Mar 8 05:56:27 2011
@@ -124,7 +124,7 @@ class ChainReduceContextImpl<KEYIN, VALU
}
@Override
- public String[] getArchiveTimestamps() {
+ public long[] getArchiveTimestamps() {
return base.getArchiveTimestamps();
}
@@ -155,7 +155,7 @@ class ChainReduceContextImpl<KEYIN, VALU
}
@Override
- public String[] getFileTimestamps() {
+ public long[] getFileTimestamps() {
return base.getFileTimestamps();
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Tue Mar 8 05:56:27 2011
@@ -133,7 +133,7 @@ public class WrappedMapper<KEYIN, VALUEI
}
@Override
- public String[] getArchiveTimestamps() {
+ public long[] getArchiveTimestamps() {
return mapContext.getArchiveTimestamps();
}
@@ -164,7 +164,7 @@ public class WrappedMapper<KEYIN, VALUEI
}
@Override
- public String[] getFileTimestamps() {
+ public long[] getFileTimestamps() {
return mapContext.getFileTimestamps();
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Tue Mar 8 05:56:27 2011
@@ -126,7 +126,7 @@ public class WrappedReducer<KEYIN, VALUE
}
@Override
- public String[] getArchiveTimestamps() {
+ public long[] getArchiveTimestamps() {
return reduceContext.getArchiveTimestamps();
}
@@ -157,7 +157,7 @@ public class WrappedReducer<KEYIN, VALUE
}
@Override
- public String[] getFileTimestamps() {
+ public long[] getFileTimestamps() {
return reduceContext.getFileTimestamps();
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Tue Mar 8 05:56:27 2011
@@ -182,7 +182,7 @@ public class TokenCache {
* @throws IOException
*/
@InterfaceAudience.Private
- public static Credentials loadTokens(String jobTokenFile, JobConf conf)
+ public static Credentials loadTokens(String jobTokenFile, Configuration conf)
throws IOException {
Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Tue Mar 8 05:56:27 2011
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
import org.apache.hadoop.mapreduce.JobID;
@InterfaceAudience.Private
@@ -44,19 +43,16 @@ public class Localizer {
private FileSystem fs;
private String[] localDirs;
- private TaskController taskController;
/**
* Create a Localizer instance
*
* @param fileSys
* @param lDirs
- * @param tc
*/
- public Localizer(FileSystem fileSys, String[] lDirs, TaskController tc) {
+ public Localizer(FileSystem fileSys, String[] lDirs) {
fs = fileSys;
localDirs = lDirs;
- taskController = tc;
}
@InterfaceAudience.Private
@@ -264,13 +260,6 @@ public class Localizer {
+ user);
}
- // Now, run the task-controller specific code to initialize the
- // user-directories.
- InitializationContext context = new InitializationContext();
- context.user = user;
- context.workDir = null;
- taskController.initializeUser(context);
-
// Localization of the user is done
localizedUser.set(true);
}
@@ -283,7 +272,7 @@ public class Localizer {
* <br>
* Here, we set 700 permissions on the job directories created on all disks.
* This we do so as to avoid any misuse by other users till the time
- * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+ * {@link TaskController#initializeJob} is run at a
* later time to set proper private permissions on the job directories. <br>
*
* @param user
@@ -331,16 +320,15 @@ public class Localizer {
* @param user
* @param jobId
* @param attemptId
- * @param isCleanupAttempt
* @throws IOException
*/
public void initializeAttemptDirs(String user, String jobId,
- String attemptId, boolean isCleanupAttempt)
+ String attemptId)
throws IOException {
boolean initStatus = false;
String attemptDirPath =
- TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt);
+ TaskTracker.getLocalTaskDir(user, jobId, attemptId);
for (String localDir : localDirs) {
Path localAttemptDir = new Path(localDir, attemptDirPath);
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Tue Mar 8 05:56:27 2011
@@ -334,7 +334,8 @@ public class JobContextImpl implements J
* @return a string array of timestamps
* @throws IOException
*/
- public String[] getArchiveTimestamps() {
+ @Override
+ public long[] getArchiveTimestamps() {
return DistributedCache.getArchiveTimestamps(conf);
}
@@ -344,7 +345,8 @@ public class JobContextImpl implements J
* @return a string array of timestamps
* @throws IOException
*/
- public String[] getFileTimestamps() {
+ @Override
+ public long[] getFileTimestamps() {
return DistributedCache.getFileTimestamps(conf);
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java Tue Mar 8 05:56:27 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.util.AsyncDiskService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -57,6 +58,8 @@ public class MRAsyncDiskService {
AsyncDiskService asyncDiskService;
+ TaskController taskController;
+
public static final String TOBEDELETED = "toBeDeleted";
/**
@@ -64,14 +67,18 @@ public class MRAsyncDiskService {
* root directories).
*
* The AsyncDiskServices uses one ThreadPool per volume to do the async disk
- * operations.
+ * operations. A {@link TaskController} is passed that will be used to do
+ * the deletes
*
* @param localFileSystem The localFileSystem used for deletions.
+ * @param taskController The taskController that should be used for the
+ * delete operations
* @param nonCanonicalVols The roots of the file system volumes, which may
* be absolte paths, or paths relative to the ${user.dir} system property
* ("cwd").
*/
- public MRAsyncDiskService(FileSystem localFileSystem,
+ public MRAsyncDiskService(FileSystem localFileSystem,
+ TaskController taskController,
String... nonCanonicalVols) throws IOException {
this.localFileSystem = localFileSystem;
@@ -84,6 +91,8 @@ public class MRAsyncDiskService {
asyncDiskService = new AsyncDiskService(this.volumes);
+ this.taskController = taskController;
+
// Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) {
// Create the root for file deletion
@@ -109,13 +118,31 @@ public class MRAsyncDiskService {
+ " because it's outside of " + volumes[v]);
}
DeleteTask task = new DeleteTask(volumes[v], absoluteFilename,
- relative);
+ relative, files[f].getOwner());
execute(volumes[v], task);
}
}
}
/**
+ * Create a AsyncDiskServices with a set of volumes (specified by their
+ * root directories).
+ *
+ * The AsyncDiskServices uses one ThreadPool per volume to do the async disk
+ * operations.
+ *
+ * @param localFileSystem The localFileSystem used for deletions.
+ * @param nonCanonicalVols The roots of the file system volumes, which may
+ * be absolte paths, or paths relative to the ${user.dir} system property
+ * ("cwd").
+ */
+ public MRAsyncDiskService(FileSystem localFileSystem,
+ String... nonCanonicalVols) throws IOException {
+ this(localFileSystem, null, nonCanonicalVols);
+ }
+
+
+ /**
* Initialize MRAsyncDiskService based on conf.
* @param conf local file system and local dirs will be read from conf
*/
@@ -174,6 +201,8 @@ public class MRAsyncDiskService {
String originalPath;
/** The file name after the move */
String pathToBeDeleted;
+ /** The owner of the file */
+ String owner;
/**
* Delete a file/directory (recursively if needed).
@@ -181,11 +210,14 @@ public class MRAsyncDiskService {
* @param originalPath The original name, relative to volume root.
* @param pathToBeDeleted The name after the move, relative to volume root,
* containing TOBEDELETED.
+ * @param owner The owner of the file
*/
- DeleteTask(String volume, String originalPath, String pathToBeDeleted) {
+ DeleteTask(String volume, String originalPath, String pathToBeDeleted,
+ String owner) {
this.volume = volume;
this.originalPath = originalPath;
this.pathToBeDeleted = pathToBeDeleted;
+ this.owner = owner;
}
@Override
@@ -201,7 +233,12 @@ public class MRAsyncDiskService {
Exception e = null;
try {
Path absolutePathToBeDeleted = new Path(volume, pathToBeDeleted);
- success = localFileSystem.delete(absolutePathToBeDeleted, true);
+ if (taskController != null & owner != null) {
+ taskController.deleteAsUser(owner,
+ absolutePathToBeDeleted.toString());
+ } else {
+ success = localFileSystem.delete(absolutePathToBeDeleted, true);
+ }
} catch (Exception ex) {
e = ex;
}
@@ -262,8 +299,9 @@ public class MRAsyncDiskService {
// Return false in case that the file is not found.
return false;
}
-
- DeleteTask task = new DeleteTask(volume, pathName, newPathName);
+ FileStatus status = localFileSystem.getFileStatus(target);
+ DeleteTask task = new DeleteTask(volume, pathName, newPathName,
+ status.getOwner());
execute(volume, task);
return true;
}
@@ -371,5 +409,31 @@ public class MRAsyncDiskService {
throw new IOException("Cannot delete " + absolutePathName
+ " because it's outside of all volumes.");
}
-
+ /**
+ * Move the path name to a temporary location and then delete it.
+ *
+ * Note that if there is no volume that contains this path, the path
+ * will stay as it is, and the function will return false.
+ *
+ * This functions returns when the moves are done, but not necessarily all
+ * deletions are done. This is usually good enough because applications
+ * won't see the path name under the old name anyway after the move.
+ *
+ * @param volume The disk volume
+ * @param absolutePathName The path name from root "/"
+ * @throws IOException If the move failed
+ * @return false if we are unable to move the path name
+ */
+ public boolean moveAndDeleteAbsolutePath(String volume,
+ String absolutePathName)
+ throws IOException {
+ String relative = getRelativePathName(absolutePathName, volume);
+ if (relative == null) {
+ // This should never happen
+ throw new IOException("Cannot delete " + absolutePathName
+ + " because it's outside of " + volume);
+ }
+ return moveAndDeleteRelativePath(volume, relative);
+ }
+
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java Tue Mar 8 05:56:27 2011
@@ -35,15 +35,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.DefaultTaskController;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.mapred.TaskController.Signal;
/**
* A Proc file-system based ProcessTree. Works only on Linux.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class ProcfsBasedProcessTree extends ProcessTree {
+public class ProcfsBasedProcessTree {
static final Log LOG = LogFactory
.getLog(ProcfsBasedProcessTree.class);
@@ -91,20 +94,19 @@ public class ProcfsBasedProcessTree exte
// to a test directory.
private String procfsDir;
- private Integer pid = -1;
+ private final Integer pid;
private Long cpuTime = 0L;
private boolean setsidUsed = false;
- private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
- private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
+ private Map<Integer, ProcessInfo> processTree =
+ new HashMap<Integer, ProcessInfo>();
public ProcfsBasedProcessTree(String pid) {
- this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ this(pid, false);
}
- public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
- long sigkillInterval) {
- this(pid, setsidUsed, sigkillInterval, PROCFS);
+ public ProcfsBasedProcessTree(String pid, boolean setsidUsed) {
+ this(pid, setsidUsed, PROCFS);
}
/**
@@ -115,29 +117,14 @@ public class ProcfsBasedProcessTree exte
*
* @param pid root of the process tree
* @param setsidUsed true, if setsid was used for the root pid
- * @param sigkillInterval how long to wait between a SIGTERM and SIGKILL
- * when killing a process tree
* @param procfsDir the root of a proc file system - only used for testing.
*/
public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
- long sigkillInterval, String procfsDir) {
+ String procfsDir) {
this.pid = getValidPID(pid);
this.setsidUsed = setsidUsed;
- sleeptimeBeforeSigkill = sigkillInterval;
this.procfsDir = procfsDir;
}
-
- /**
- * Sets SIGKILL interval
- * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
- * String, boolean, long)} instead
- * @param interval The time to wait before sending SIGKILL
- * after sending SIGTERM
- */
- @Deprecated
- public void setSigKillInterval(long interval) {
- sleeptimeBeforeSigkill = interval;
- }
/**
* Checks if the ProcfsBasedProcessTree is available on this system.
@@ -238,112 +225,49 @@ public class ProcfsBasedProcessTree exte
/**
* Is the root-process alive?
- *
* @return true if the root-process is alive, false otherwise.
*/
- public boolean isAlive() {
- if (pid == -1) {
- return false;
- } else {
- return isAlive(pid.toString());
- }
+ boolean isAlive(int pid, TaskController taskController) {
+ try {
+ return taskController.signalTask(null, pid, Signal.NULL);
+ } catch (IOException ignored) { }
+ return false;
+ }
+
+ boolean isAlive(TaskController taskController) {
+ return isAlive(pid, taskController);
}
/**
* Is any of the subprocesses in the process-tree alive?
- *
* @return true if any of the processes in the process-tree is
* alive, false otherwise.
*/
- public boolean isAnyProcessInTreeAlive() {
+ boolean isAnyProcessInTreeAlive(TaskController taskController) {
for (Integer pId : processTree.keySet()) {
- if (isAlive(pId.toString())) {
+ if (isAlive(pId, taskController)) {
return true;
}
}
return false;
}
+
/** Verify that the given process id is same as its process group id.
* @param pidStr Process id of the to-be-verified-process
* @param procfsDir Procfs root dir
*/
- static boolean checkPidPgrpidForMatch(String pidStr, String procfsDir) {
- Integer pId = Integer.parseInt(pidStr);
- // Get information for this process
- ProcessInfo pInfo = new ProcessInfo(pId);
- pInfo = constructProcessInfo(pInfo, procfsDir);
- if (pInfo == null) {
- // process group leader may have finished execution, but we still need to
- // kill the subProcesses in the process group.
- return true;
- }
-
- //make sure that pId and its pgrpId match
- if (!pInfo.getPgrpId().equals(pId)) {
- LOG.warn("Unexpected: Process with PID " + pId +
- " is not a process group leader.");
- return false;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(pId + " is a process group leader, as expected.");
- }
- return true;
+ public boolean checkPidPgrpidForMatch() {
+ return checkPidPgrpidForMatch(pid, PROCFS);
}
- /** Make sure that the given pid is a process group leader and then
- * destroy the process group.
- * @param pgrpId Process group id of to-be-killed-processes
- * @param interval The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param inBackground Process is to be killed in the back ground with
- * a separate thread
- */
- public static void assertAndDestroyProcessGroup(String pgrpId, long interval,
- boolean inBackground)
- throws IOException {
- // Make sure that the pid given is a process group leader
- if (!checkPidPgrpidForMatch(pgrpId, PROCFS)) {
- throw new IOException("Process with PID " + pgrpId +
- " is not a process group leader.");
- }
- destroyProcessGroup(pgrpId, interval, inBackground);
- }
-
- /**
- * Destroy the process-tree.
- */
- public void destroy() {
- destroy(true);
- }
-
- /**
- * Destroy the process-tree.
- * @param inBackground Process is to be killed in the back ground with
- * a separate thread
- */
- public void destroy(boolean inBackground) {
- LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
- if (pid == -1) {
- return;
- }
- if (isAlive(pid.toString())) {
- if (isSetsidAvailable && setsidUsed) {
- // In this case, we know that pid got created using setsid. So kill the
- // whole processGroup.
- try {
- assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill,
- inBackground);
- } catch (IOException e) {
- LOG.warn(StringUtils.stringifyException(e));
- }
- }
- else {
- //TODO: Destroy all the processes in the subtree in this case also.
- // For the time being, killing only the root process.
- destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground);
- }
- }
+ static boolean checkPidPgrpidForMatch(int _pid, String procfs) {
+ // Get information for this process
+ ProcessInfo pInfo = new ProcessInfo(_pid);
+ pInfo = constructProcessInfo(pInfo, procfs);
+ // null if process group leader finished execution; issue no warning
+ // make sure that pid and its pgrpId match
+ return pInfo == null || pInfo.getPgrpId().equals(_pid);
}
private static final String PROCESSTREE_DUMP_FORMAT =
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java Tue Mar 8 05:56:27 2011
@@ -38,14 +38,20 @@ public class ProcfsBasedProcessTree exte
super(pid);
}
+ /**
+ * @param sigkillInterval Has no effect
+ */
public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
long sigkillInterval) {
- super(pid, setsidUsed, sigkillInterval);
+ super(pid, setsidUsed);
}
+ /**
+ * @param sigkillInterval Has no effect
+ */
public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
long sigkillInterval, String procfsDir) {
- super(pid, setsidUsed, sigkillInterval, procfsDir);
+ super(pid, setsidUsed, procfsDir);
}
public ProcfsBasedProcessTree getProcessTree() {
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Tue Mar 8 05:56:27 2011
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -80,41 +81,23 @@ public class ClusterWithLinuxTaskControl
+ "/task-controller";
@Override
- public void setup() throws IOException {
+ public void setup(LocalDirAllocator allocator) throws IOException {
getConf().set(TTConfig.TT_GROUP, taskTrackerSpecialGroup);
// write configuration file
configurationFile = createTaskControllerConf(System
.getProperty(TASKCONTROLLER_PATH), getConf());
- super.setup();
+ super.setup(allocator);
}
@Override
- protected String getTaskControllerExecutablePath() {
+ protected String getTaskControllerExecutablePath(Configuration conf) {
return taskControllerExePath;
}
void setTaskControllerExe(String execPath) {
this.taskControllerExePath = execPath;
}
-
- volatile static int attemptedSigQuits = 0;
- volatile static int failedSigQuits = 0;
-
- /** Work like LinuxTaskController, but also count the number of
- * attempted and failed SIGQUIT sends via the task-controller
- * executable.
- */
- @Override
- void dumpTaskStack(TaskControllerContext context) {
- attemptedSigQuits++;
- try {
- signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
- } catch (Exception e) {
- LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
- failedSigQuits++;
- }
- }
}
// cluster instances which sub classes can use
@@ -275,7 +258,7 @@ public class ClusterWithLinuxTaskControl
if (ugi.indexOf(",") > 1) {
return true;
}
- LOG.info("Invalid taskcontroller-ugi : " + ugi);
+ LOG.info("Invalid taskcontroller-ugi (requires \"user,group\"): " + ugi);
return false;
}
LOG.info("Invalid taskcontroller-ugi : " + ugi);
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java Tue Mar 8 05:56:27 2011
@@ -35,7 +35,9 @@ import static org.junit.Assert.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.Ignore;
+@Ignore("The debug script is broken in the current build.")
public class TestDebugScript {
// base directory which is used by the debug script
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Tue Mar 8 05:56:27 2011
@@ -23,7 +23,6 @@ import java.security.PrivilegedException
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
@@ -108,33 +107,4 @@ public class TestJobExecutionAsDifferent
});
}
- /** Ensure that SIGQUIT can be properly sent by the LinuxTaskController
- * if a task times out.
- */
- public void testTimeoutStackTrace() throws Exception {
- if (!shouldRun()) {
- return;
- }
-
- // Run a job that should timeout and trigger a SIGQUIT.
- startCluster();
- jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
- public Object run() throws Exception {
- JobConf conf = getClusterConf();
- conf.setInt(JobContext.TASK_TIMEOUT, 10000);
- conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
- SleepJob sleepJob = new SleepJob();
- sleepJob.setConf(conf);
- Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0);
- job.setMaxMapAttempts(1);
- int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits;
- job.waitForCompletion(true);
- assertTrue("Did not detect a new SIGQUIT!",
- prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits);
- assertEquals("A SIGQUIT attempt failed!", 0,
- MyLinuxTaskController.failedSigQuits);
- return null;
- }
- });
- }
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java Tue Mar 8 05:56:27 2011
@@ -18,20 +18,12 @@
package org.apache.hadoop.mapred;
-import java.io.BufferedReader;
-import java.io.FileInputStream;
import java.io.File;
-import java.io.InputStreamReader;
import java.io.IOException;
import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.SleepJob;
/**
* A JUnit test to test Kill Job & Fail Job functionality with local file
@@ -39,96 +31,38 @@ import org.apache.hadoop.mapreduce.Sleep
*/
public class TestJobKillAndFail extends TestCase {
- static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
-
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
- /**
- * TaskController instance that just sets a flag when a stack dump
- * is performed in a child thread.
- */
- static class MockStackDumpTaskController extends DefaultTaskController {
-
- static volatile int numStackDumps = 0;
-
- static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
-
- public MockStackDumpTaskController() {
- LOG.info("Instantiated MockStackDumpTC");
- }
-
- @Override
- void dumpTaskStack(TaskControllerContext context) {
- LOG.info("Got stack-dump request in TaskController");
- MockStackDumpTaskController.numStackDumps++;
- super.dumpTaskStack(context);
- }
-
- }
-
- /** If a task was killed, then dumpTaskStack() should have been
- * called. Test whether or not the counter was incremented
- * and succeed/fail based on this. */
- private void checkForStackDump(boolean expectDump, int lastNumDumps) {
- int curNumDumps = MockStackDumpTaskController.numStackDumps;
-
- LOG.info("curNumDumps=" + curNumDumps + "; lastNumDumps=" + lastNumDumps
- + "; expect=" + expectDump);
-
- if (expectDump) {
- assertTrue("No stack dump recorded!", lastNumDumps < curNumDumps);
- } else {
- assertTrue("Stack dump happened anyway!", lastNumDumps == curNumDumps);
- }
- }
-
- public void testJobFailAndKill() throws Exception {
+ public void testJobFailAndKill() throws IOException {
MiniMRCluster mr = null;
try {
JobConf jtConf = new JobConf();
jtConf.set("mapred.jobtracker.instrumentation",
JTInstrumentation.class.getName());
- jtConf.set("mapreduce.tasktracker.taskcontroller",
- MockStackDumpTaskController.class.getName());
mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
JTInstrumentation instr = (JTInstrumentation)
mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
// run the TCs
JobConf conf = mr.createJobConf();
- conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
- RunningJob runningJob = UtilsForTests.runJobFail(conf, inDir, outDir);
+ RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
// Checking that the Job got failed
- assertEquals(runningJob.getJobState(), JobStatus.FAILED);
+ assertEquals(job.getJobState(), JobStatus.FAILED);
assertTrue(instr.verifyJob());
assertEquals(1, instr.failed);
instr.reset();
- int prevNumDumps = MockStackDumpTaskController.numStackDumps;
- runningJob = UtilsForTests.runJobKill(conf, inDir, outDir);
+
+ job = UtilsForTests.runJobKill(conf, inDir, outDir);
// Checking that the Job got killed
- assertTrue(runningJob.isComplete());
- assertEquals(runningJob.getJobState(), JobStatus.KILLED);
+ assertTrue(job.isComplete());
+ assertEquals(job.getJobState(), JobStatus.KILLED);
assertTrue(instr.verifyJob());
assertEquals(1, instr.killed);
- // check that job kill does not put a stacktrace in task logs.
- checkForStackDump(false, prevNumDumps);
-
- // Test that a task that times out does have a stack trace
- conf = mr.createJobConf();
- conf.setInt(JobContext.TASK_TIMEOUT, 10000);
- conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
- SleepJob sleepJob = new SleepJob();
- sleepJob.setConf(conf);
- Job job = sleepJob.createJob(1, 0, 30000, 1,0, 0);
- job.setMaxMapAttempts(1);
- prevNumDumps = MockStackDumpTaskController.numStackDumps;
- job.waitForCompletion(true);
- checkForStackDump(true, prevNumDumps);
} finally {
if (mr != null) {
mr.shutdown();