You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2007/12/19 09:55:51 UTC
svn commit: r605472 - in /lucene/hadoop/branches/branch-0.15: ./
src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/
src/java/org/apache/hadoop/mapred/
Author: acmurthy
Date: Wed Dec 19 00:55:50 2007
New Revision: 605472
URL: http://svn.apache.org/viewvc?rev=605472&view=rev
Log:
Merge -r 605470:605471 from trunk to branch-0.15 to fix HADOOP-2227.
Modified:
lucene/hadoop/branches/branch-0.15/CHANGES.txt
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Wed Dec 19 00:55:50 2007
@@ -29,6 +29,12 @@
transaction log, it stops writing new transactions to that one.
(Raghu Angadi via dhruba)
+ HADOOP-2227. Use the LocalDirAllocator uniformly for handling all of the
+ temporary storage required for a given task. It also implies that
+ mapred.local.dir.minspacestart is handled by checking if there is enough
+ free-space on any one of the available disks. (Amareshwari Sri Ramadasu
+ via acmurthy)
+
IMPROVEMENTS
HADOOP-2160. Remove project-level, non-user documentation from
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java Wed Dec 19 00:55:50 2007
@@ -127,6 +127,7 @@
* being used in the Configuration
* @param conf The Confguration file which contains the filesystem
* @param baseDir The base cache Dir where you wnat to localize the files/archives
+ * @param fileStatus The file status on the dfs.
* @param isArchive if the cache is an archive or a file. In case it is an archive
* with a .zip or .jar extension it will be unzipped/unjarred automatically
* and the directory where the archive is unjarred is returned as the Path.
@@ -139,8 +140,10 @@
* the path to the file where the file is copied locally
* @throws IOException
*/
- public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
- boolean isArchive, long confFileStamp, Path currentWorkDir)
+ public static Path getLocalCache(URI cache, Configuration conf,
+ Path baseDir, FileStatus fileStatus,
+ boolean isArchive, long confFileStamp,
+ Path currentWorkDir)
throws IOException {
String cacheId = makeRelative(cache, conf);
CacheStatus lcacheStatus;
@@ -155,7 +158,7 @@
synchronized (lcacheStatus) {
localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
- isArchive, currentWorkDir);
+ fileStatus, isArchive, currentWorkDir);
lcacheStatus.refcount++;
}
}
@@ -172,6 +175,38 @@
}
/**
+ * Get the locally cached file or archive; it could either be
+ * previously cached (and valid) or copy it from the {@link FileSystem} now.
+ *
+ * @param cache the cache to be localized, this should be specified as
+ * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
+ * or hostname:port is provided the file is assumed to be in the filesystem
+ * being used in the Configuration
+ * @param conf The Confguration file which contains the filesystem
+ * @param baseDir The base cache Dir where you wnat to localize the files/archives
+ * @param isArchive if the cache is an archive or a file. In case it is an archive
+ * with a .zip or .jar extension it will be unzipped/unjarred automatically
+ * and the directory where the archive is unjarred is returned as the Path.
+ * In case of a file, the path to the file is returned
+ * @param confFileStamp this is the hdfs file modification timestamp to verify that the
+ * file to be cached hasn't changed since the job started
+ * @param currentWorkDir this is the directory where you would want to create symlinks
+ * for the locally cached files/archives
+ * @return the path to directory where the archives are unjarred in case of archives,
+ * the path to the file where the file is copied locally
+ * @throws IOException
+
+ */
+ public static Path getLocalCache(URI cache, Configuration conf,
+ Path baseDir, boolean isArchive,
+ long confFileStamp, Path currentWorkDir)
+ throws IOException {
+ return getLocalCache(cache, conf,
+ baseDir, null, isArchive,
+ confFileStamp, currentWorkDir);
+ }
+
+ /**
* This is the opposite of getlocalcache. When you are done with
* using the cache, you need to release the cache
* @param cache The cache URI to be released
@@ -219,7 +254,7 @@
* relative path is hostname of DFS this mapred cluster is running
* on/absolute_path
*/
- private static String makeRelative(URI cache, Configuration conf)
+ public static String makeRelative(URI cache, Configuration conf)
throws IOException {
String fsname = cache.getScheme();
String path;
@@ -241,6 +276,7 @@
private static Path localizeCache(Configuration conf,
URI cache, long confFileStamp,
CacheStatus cacheStatus,
+ FileStatus fileStatus,
boolean isArchive,
Path currentWorkDir)
throws IOException {
@@ -248,7 +284,8 @@
FileSystem fs = getFileSystem(cache, conf);
String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
File flink = new File(link);
- if (ifExistsAndFresh(conf, fs, cache, confFileStamp, cacheStatus)) {
+ if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
+ cacheStatus, fileStatus)) {
if (isArchive) {
if (doSymlink){
if (!flink.exists())
@@ -278,6 +315,7 @@
localFs.delete(cacheStatus.localLoadPath);
Path parchive = new Path(cacheStatus.localLoadPath,
new Path(cacheStatus.localLoadPath.getName()));
+
if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
throw new IOException("Mkdirs failed to create directory " +
cacheStatus.localLoadPath.toString());
@@ -325,13 +363,19 @@
// Checks if the cache has already been localized and is fresh
private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
URI cache, long confFileStamp,
- CacheStatus lcacheStatus)
+ CacheStatus lcacheStatus,
+ FileStatus fileStatus)
throws IOException {
// check for existence of the cache
if (lcacheStatus.currentStatus == false) {
return false;
} else {
- long dfsFileStamp = getTimestamp(conf, cache);
+ long dfsFileStamp;
+ if (fileStatus != null) {
+ dfsFileStamp = fileStatus.getModificationTime();
+ } else {
+ dfsFileStamp = getTimestamp(conf, cache);
+ }
// ensure that the file on hdfs hasn't been modified since the job started
if (dfsFileStamp != confFileStamp) {
@@ -373,7 +417,8 @@
*/
public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
throws IOException{
- if ((!jobCacheDir.isDirectory()) || (!workDir.isDirectory())){
+ if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
+ workDir == null || (!workDir.isDirectory())) {
return;
}
boolean createSymlink = getSymlink(conf);
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java Wed Dec 19 00:55:50 2007
@@ -165,6 +165,18 @@
}
}
+ /** We search through all the configured dirs for the file's existence
+ * and return true when we find
+ * @param pathStr the requested file (this will be searched)
+ * @param conf the Configuration object
+ * @return true if files exist. false otherwise
+ * @throws IOException
+ */
+ public boolean ifExists(String pathStr,Configuration conf) {
+ AllocatorPerContext context = obtainContext(contextCfgItemName);
+ return context.ifExists(pathStr, conf);
+ }
+
private static class AllocatorPerContext {
private final Log LOG =
@@ -326,6 +338,31 @@
//no path found
throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
" the configured local directories");
+ }
+
+ /** We search through all the configured dirs for the file's existence
+ * and return true when we find one
+ */
+ public synchronized boolean ifExists(String pathStr,Configuration conf) {
+ try {
+ int numDirs = localDirs.length;
+ int numDirsSearched = 0;
+ //remove the leading slash from the path (to make sure that the uri
+ //resolution results in a valid path on the dir being checked)
+ if (pathStr.startsWith("/")) {
+ pathStr = pathStr.substring(1);
+ }
+ while (numDirsSearched < numDirs) {
+ Path file = new Path(localDirs[numDirsSearched], pathStr);
+ if (localFS.exists(file)) {
+ return true;
+ }
+ numDirsSearched++;
+ }
+ } catch (IOException e) {
+ // IGNORE and try again
+ }
+ return false;
}
}
}
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Dec 19 00:55:50 2007
@@ -43,6 +43,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InMemoryFileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -786,12 +787,19 @@
// get the work directory which holds the elements we are dynamically
// adding to the classpath
File workDir = new File(task.getJobFile()).getParentFile();
- File jobCacheDir = new File(workDir.getParent(), "work");
ArrayList<URL> urllist = new ArrayList<URL>();
// add the jars and directories to the classpath
String jar = conf.getJar();
if (jar != null) {
+ LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator("mapred.local.dir");
+ File jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getJobCacheSubdir()
+ + Path.SEPARATOR + getJobId()
+ + Path.SEPARATOR
+ + "work", conf).toString());
+
File[] libs = new File(jobCacheDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Dec 19 00:55:50 2007
@@ -20,6 +20,7 @@
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.filecache.*;
import org.apache.hadoop.util.*;
import java.io.*;
@@ -91,18 +92,52 @@
//all the archives
File workDir = new File(t.getJobFile()).getParentFile();
String taskid = t.getTaskId();
- File jobCacheDir = new File(workDir.getParent(), "work");
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+ File jobCacheDir = null;
+ try {
+ jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getJobCacheSubdir()
+ + Path.SEPARATOR + t.getJobId()
+ + Path.SEPARATOR
+ + "work", conf).toString());
+ } catch (IOException ioe) {
+ LOG.warn("work directory doesnt exist");
+ }
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
+ FileStatus fileStatus;
+ FileSystem fileSystem;
+ Path localPath;
+ String baseDir;
+
if ((archives != null) || (files != null)) {
if (archives != null) {
- String[] archivesTimestamps = DistributedCache.getArchiveTimestamps(conf);
+ String[] archivesTimestamps =
+ DistributedCache.getArchiveTimestamps(conf);
Path[] p = new Path[archives.length];
for (int i = 0; i < archives.length;i++){
+ fileSystem = FileSystem.get(archives[i], conf);
+ fileStatus = fileSystem.getFileStatus(
+ new Path(archives[i].getPath()));
+ String cacheId = DistributedCache.makeRelative(archives[i],conf);
+ String cachePath = TaskTracker.getCacheSubdir() +
+ Path.SEPARATOR + cacheId;
+ if (lDirAlloc.ifExists(cachePath, conf)) {
+ localPath = lDirAlloc.getLocalPathToRead(cachePath, conf);
+ }
+ else {
+ localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), conf);
+ }
+ baseDir = localPath.toString().replace(cacheId, "");
p[i] = DistributedCache.getLocalCache(archives[i], conf,
- conf.getLocalPath(TaskTracker.getCacheSubdir()),
- true, Long.parseLong(archivesTimestamps[i]),
- new Path(workDir.getAbsolutePath()));
+ new Path(baseDir),
+ fileStatus,
+ true, Long.parseLong(
+ archivesTimestamps[i]),
+ new Path(workDir.
+ getAbsolutePath()));
+
}
DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
}
@@ -110,10 +145,26 @@
String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
Path[] p = new Path[files.length];
for (int i = 0; i < files.length;i++){
+ fileSystem = FileSystem.get(files[i], conf);
+ fileStatus = fileSystem.getFileStatus(
+ new Path(files[i].getPath()));
+ String cacheId = DistributedCache.makeRelative(files[i], conf);
+ String cachePath = TaskTracker.getCacheSubdir() +
+ Path.SEPARATOR + cacheId;
+ if (lDirAlloc.ifExists(cachePath,conf)) {
+ localPath = lDirAlloc.getLocalPathToRead(cachePath, conf);
+ } else {
+ localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), conf);
+ }
+ baseDir = localPath.toString().replace(cacheId, "");
p[i] = DistributedCache.getLocalCache(files[i], conf,
- conf.getLocalPath(TaskTracker.getCacheSubdir()),
- false, Long.parseLong(fileTimestamps[i]),
- new Path(workDir.getAbsolutePath()));
+ new Path(baseDir),
+ fileStatus,
+ false, Long.parseLong(
+ fileTimestamps[i]),
+ new Path(workDir.
+ getAbsolutePath()));
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Dec 19 00:55:50 2007
@@ -49,6 +49,7 @@
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -555,20 +556,44 @@
}
}
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator("mapred.local.dir");
+
// intialize the job directory
private void localizeJob(TaskInProgress tip) throws IOException {
Path localJarFile = null;
Task t = tip.getTask();
String jobId = t.getJobId();
- Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()),
- jobId + Path.SEPARATOR + "job.xml");
+ String jobFile = t.getJobFile();
+ // Get sizes of JobFile and JarFile
+ // sizes are -1 if they are not present.
+ FileSystem fileSystem = FileSystem.get(fConf);
+ FileStatus status[] = fileSystem.listStatus(new Path(jobFile).getParent());
+ long jarFileSize = -1;
+ long jobFileSize = -1;
+ for(FileStatus stat : status) {
+ if (stat.getPath().toString().contains("job.xml")) {
+ jobFileSize = stat.getLen();
+ } else {
+ jobFileSize = -1;
+ }
+ if (stat.getPath().toString().contains("job.jar")) {
+ jarFileSize = stat.getLen();
+ } else {
+ jarFileSize = -1;
+ }
+ }
+ // Here we check for double the size of jobfile to accommodate for
+ // localize task file and we check four times the size of jarFileSize to
+ // accommodate for unjarring the jar file in work directory
+ Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
+ + Path.SEPARATOR + jobId
+ + Path.SEPARATOR + "job.xml"),
+ 2 * jobFileSize + 5 * jarFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
synchronized (rjob) {
if (!rjob.localized) {
- localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()),
- jobId + Path.SEPARATOR + "job.jar");
- String jobFile = t.getJobFile();
FileSystem localFs = FileSystem.getLocal(fConf);
// this will happen on a partial execution of localizeJob.
// Sometimes the job.xml gets copied but copying job.jar
@@ -586,6 +611,7 @@
JobConf localJobConf = new JobConf(localJobFile);
String jarFile = localJobConf.getJar();
if (jarFile != null) {
+ localJarFile = new Path(jobDir,"job.jar");
fs.copyToLocalFile(new Path(jarFile), localJarFile);
localJobConf.setJar(localJarFile.toString());
OutputStream out = localFs.create(localJobFile);
@@ -1113,11 +1139,11 @@
localDirsDf.put(localDirs[i], df);
}
- if (df.getAvailable() < minSpace)
- return false;
+ if (df.getAvailable() > minSpace)
+ return true;
}
- return true;
+ return false;
}
/**
@@ -1249,9 +1275,10 @@
}
private void localizeTask(Task task) throws IOException{
- Path localTaskDir =
- new Path(this.defaultJobConf.getLocalPath(TaskTracker.getJobCacheSubdir()),
- (task.getJobId() + Path.SEPARATOR + task.getTaskId()));
+ Path localTaskDir =
+ lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() +
+ Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
+ task.getTaskId()), defaultJobConf );
FileSystem localFs = FileSystem.getLocal(fConf);
if (!localFs.mkdirs(localTaskDir)) {
throw new IOException("Mkdirs failed to create " + localTaskDir.toString());