You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2007/12/19 09:55:51 UTC

svn commit: r605472 - in /lucene/hadoop/branches/branch-0.15: ./ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/

Author: acmurthy
Date: Wed Dec 19 00:55:50 2007
New Revision: 605472

URL: http://svn.apache.org/viewvc?rev=605472&view=rev
Log:
Merge -r 605470:605471 from trunk to branch-0.15 to fix HADOOP-2227.

Modified:
    lucene/hadoop/branches/branch-0.15/CHANGES.txt
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Wed Dec 19 00:55:50 2007
@@ -29,6 +29,12 @@
     transaction log, it stops writing new transactions to that one.
     (Raghu Angadi via dhruba)
 
+    HADOOP-2227.  Use the LocalDirAllocator uniformly for handling all of the
+    temporary storage required for a given task. It also implies that
+    mapred.local.dir.minspacestart is handled by checking if there is enough
+    free-space on any one of the available disks. (Amareshwari Sri Ramadasu
+    via acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-2160.  Remove project-level, non-user documentation from

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/filecache/DistributedCache.java Wed Dec 19 00:55:50 2007
@@ -127,6 +127,7 @@
    * being used in the Configuration
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param fileStatus The file status on the dfs.
    * @param isArchive if the cache is an archive or a file. In case it is an archive
    *  with a .zip or .jar extension it will be unzipped/unjarred automatically 
    *  and the directory where the archive is unjarred is returned as the Path.
@@ -139,8 +140,10 @@
    * the path to the file where the file is copied locally 
    * @throws IOException
    */
-  public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
-                                   boolean isArchive, long confFileStamp, Path currentWorkDir) 
+  public static Path getLocalCache(URI cache, Configuration conf, 
+                                   Path baseDir, FileStatus fileStatus,
+                                   boolean isArchive, long confFileStamp,
+                                   Path currentWorkDir) 
   throws IOException {
     String cacheId = makeRelative(cache, conf);
     CacheStatus lcacheStatus;
@@ -155,7 +158,7 @@
       
       synchronized (lcacheStatus) {
         localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
-                                      isArchive, currentWorkDir);
+                                      fileStatus, isArchive, currentWorkDir);
         lcacheStatus.refcount++;
       }
     }
@@ -172,6 +175,38 @@
   }
   
   /**
+   * Get the locally cached file or archive; it could either be 
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   * 
+   * @param cache the cache to be localized, this should be specified as 
+   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
+   * or hostname:port is provided the file is assumed to be in the filesystem
+   * being used in the Configuration
+   * @param conf The Confguration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param isArchive if the cache is an archive or a file. In case it is an archive
+   *  with a .zip or .jar extension it will be unzipped/unjarred automatically 
+   *  and the directory where the archive is unjarred is returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
+   * file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create symlinks 
+   * for the locally cached files/archives
+   * @return the path to directory where the archives are unjarred in case of archives,
+   * the path to the file where the file is copied locally 
+   * @throws IOException
+
+   */
+  public static Path getLocalCache(URI cache, Configuration conf, 
+                                   Path baseDir, boolean isArchive,
+                                   long confFileStamp, Path currentWorkDir) 
+  throws IOException {
+    return getLocalCache(cache, conf, 
+                         baseDir, null, isArchive,
+                         confFileStamp, currentWorkDir);
+  }
+  
+  /**
    * This is the opposite of getlocalcache. When you are done with
    * using the cache, you need to release the cache
    * @param cache The cache URI to be released
@@ -219,7 +254,7 @@
    * relative path is hostname of DFS this mapred cluster is running
    * on/absolute_path
    */
-  private static String makeRelative(URI cache, Configuration conf)
+  public static String makeRelative(URI cache, Configuration conf)
     throws IOException {
     String fsname = cache.getScheme();
     String path;
@@ -241,6 +276,7 @@
   private static Path localizeCache(Configuration conf, 
                                     URI cache, long confFileStamp,
                                     CacheStatus cacheStatus,
+                                    FileStatus fileStatus,
                                     boolean isArchive, 
                                     Path currentWorkDir) 
   throws IOException {
@@ -248,7 +284,8 @@
     FileSystem fs = getFileSystem(cache, conf);
     String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp, cacheStatus)) {
+    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                           cacheStatus, fileStatus)) {
       if (isArchive) {
         if (doSymlink){
           if (!flink.exists())
@@ -278,6 +315,7 @@
       localFs.delete(cacheStatus.localLoadPath);
       Path parchive = new Path(cacheStatus.localLoadPath,
                                new Path(cacheStatus.localLoadPath.getName()));
+      
       if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
         throw new IOException("Mkdirs failed to create directory " + 
                               cacheStatus.localLoadPath.toString());
@@ -325,13 +363,19 @@
   // Checks if the cache has already been localized and is fresh
   private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
                                           URI cache, long confFileStamp, 
-                                          CacheStatus lcacheStatus) 
+                                          CacheStatus lcacheStatus,
+                                          FileStatus fileStatus) 
   throws IOException {
     // check for existence of the cache
     if (lcacheStatus.currentStatus == false) {
       return false;
     } else {
-      long dfsFileStamp = getTimestamp(conf, cache);
+      long dfsFileStamp;
+      if (fileStatus != null) {
+        dfsFileStamp = fileStatus.getModificationTime();
+      } else {
+        dfsFileStamp = getTimestamp(conf, cache);
+      }
 
       // ensure that the file on hdfs hasn't been modified since the job started 
       if (dfsFileStamp != confFileStamp) {
@@ -373,7 +417,8 @@
    */
   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
     throws IOException{
-    if ((!jobCacheDir.isDirectory()) || (!workDir.isDirectory())){
+    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
+           workDir == null || (!workDir.isDirectory())) {
       return;
     }
     boolean createSymlink = getSymlink(conf);

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/fs/LocalDirAllocator.java Wed Dec 19 00:55:50 2007
@@ -165,6 +165,18 @@
     }
   }
     
+  /** We search through all the configured dirs for the file's existence
+   *  and return true when we find  
+   *  @param pathStr the requested file (this will be searched)
+   *  @param conf the Configuration object
+   *  @return true if files exist. false otherwise
+   *  @throws IOException
+   */
+  public boolean ifExists(String pathStr,Configuration conf) {
+    AllocatorPerContext context = obtainContext(contextCfgItemName);
+    return context.ifExists(pathStr, conf);
+  }
+
   private static class AllocatorPerContext {
 
     private final Log LOG =
@@ -326,6 +338,31 @@
       //no path found
       throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
       " the configured local directories");
+    }
+
+    /** We search through all the configured dirs for the file's existence
+     *  and return true when we find one 
+     */
+    public synchronized boolean ifExists(String pathStr,Configuration conf) {
+      try {
+        int numDirs = localDirs.length;
+        int numDirsSearched = 0;
+        //remove the leading slash from the path (to make sure that the uri
+        //resolution results in a valid path on the dir being checked)
+        if (pathStr.startsWith("/")) {
+          pathStr = pathStr.substring(1);
+        }
+        while (numDirsSearched < numDirs) {
+          Path file = new Path(localDirs[numDirsSearched], pathStr);
+          if (localFS.exists(file)) {
+            return true;
+          }
+          numDirsSearched++;
+        }
+      } catch (IOException e) {
+        // IGNORE and try again
+      }
+      return false;
     }
   }
 }

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Dec 19 00:55:50 2007
@@ -43,6 +43,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.InMemoryFileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -786,12 +787,19 @@
       // get the work directory which holds the elements we are dynamically
       // adding to the classpath
       File workDir = new File(task.getJobFile()).getParentFile();
-      File jobCacheDir = new File(workDir.getParent(), "work");
       ArrayList<URL> urllist = new ArrayList<URL>();
       
       // add the jars and directories to the classpath
       String jar = conf.getJar();
       if (jar != null) {      
+        LocalDirAllocator lDirAlloc = 
+                            new LocalDirAllocator("mapred.local.dir");
+        File jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
+                                      TaskTracker.getJobCacheSubdir() 
+                                      + Path.SEPARATOR + getJobId() 
+                                      + Path.SEPARATOR  
+                                      + "work", conf).toString());
+
         File[] libs = new File(jobCacheDir, "lib").listFiles();
         if (libs != null) {
           for (int i = 0; i < libs.length; i++) {

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Dec 19 00:55:50 2007
@@ -20,6 +20,7 @@
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
 import java.io.*;
@@ -91,18 +92,52 @@
       //all the archives
       File workDir = new File(t.getJobFile()).getParentFile();
       String taskid = t.getTaskId();
-      File jobCacheDir = new File(workDir.getParent(), "work");
+      LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+      File jobCacheDir = null;
+      try {
+        jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
+                                    TaskTracker.getJobCacheSubdir() 
+                                    + Path.SEPARATOR + t.getJobId() 
+                                    + Path.SEPARATOR  
+                                    + "work", conf).toString());
+      } catch (IOException ioe) {
+        LOG.warn("work directory doesnt exist");
+      }
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
+      FileStatus fileStatus;
+      FileSystem fileSystem;
+      Path localPath;
+      String baseDir;
+
       if ((archives != null) || (files != null)) {
         if (archives != null) {
-          String[] archivesTimestamps = DistributedCache.getArchiveTimestamps(conf);
+          String[] archivesTimestamps = 
+                               DistributedCache.getArchiveTimestamps(conf);
           Path[] p = new Path[archives.length];
           for (int i = 0; i < archives.length;i++){
+            fileSystem = FileSystem.get(archives[i], conf);
+            fileStatus = fileSystem.getFileStatus(
+                                      new Path(archives[i].getPath()));
+            String cacheId = DistributedCache.makeRelative(archives[i],conf);
+            String cachePath = TaskTracker.getCacheSubdir() + 
+                                 Path.SEPARATOR + cacheId;
+            if (lDirAlloc.ifExists(cachePath, conf)) {
+              localPath =  lDirAlloc.getLocalPathToRead(cachePath, conf);
+            }
+            else {
+              localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+                                      fileStatus.getLen(), conf);
+            }
+            baseDir = localPath.toString().replace(cacheId, "");
             p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                                                  conf.getLocalPath(TaskTracker.getCacheSubdir()), 
-                                                  true, Long.parseLong(archivesTimestamps[i]), 
-                                                  new Path(workDir.getAbsolutePath()));
+                                                  new Path(baseDir),
+                                                  fileStatus,
+                                                  true, Long.parseLong(
+                                                        archivesTimestamps[i]),
+                                                  new Path(workDir.
+                                                        getAbsolutePath()));
+            
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
         }
@@ -110,10 +145,26 @@
           String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
           Path[] p = new Path[files.length];
           for (int i = 0; i < files.length;i++){
+            fileSystem = FileSystem.get(files[i], conf);
+            fileStatus = fileSystem.getFileStatus(
+                                      new Path(files[i].getPath()));
+            String cacheId = DistributedCache.makeRelative(files[i], conf);
+            String cachePath = TaskTracker.getCacheSubdir() +
+                                 Path.SEPARATOR + cacheId;
+            if (lDirAlloc.ifExists(cachePath,conf)) {
+              localPath =  lDirAlloc.getLocalPathToRead(cachePath, conf);
+            } else {
+              localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+                                      fileStatus.getLen(), conf);
+            }
+            baseDir = localPath.toString().replace(cacheId, "");
             p[i] = DistributedCache.getLocalCache(files[i], conf, 
-                                                  conf.getLocalPath(TaskTracker.getCacheSubdir()), 
-                                                  false, Long.parseLong(fileTimestamps[i]), 
-                                                  new Path(workDir.getAbsolutePath()));
+                                                  new Path(baseDir),
+                                                  fileStatus,
+                                                  false, Long.parseLong(
+                                                           fileTimestamps[i]),
+                                                  new Path(workDir.
+                                                        getAbsolutePath()));
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=605472&r1=605471&r2=605472&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Dec 19 00:55:50 2007
@@ -49,6 +49,7 @@
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -555,20 +556,44 @@
     }
   }
 
+  private LocalDirAllocator lDirAlloc = 
+                              new LocalDirAllocator("mapred.local.dir");
+
   // intialize the job directory
   private void localizeJob(TaskInProgress tip) throws IOException {
     Path localJarFile = null;
     Task t = tip.getTask();
     String jobId = t.getJobId();
-    Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), 
-                                 jobId + Path.SEPARATOR + "job.xml");
+    String jobFile = t.getJobFile();
+    // Get sizes of JobFile and JarFile
+    // sizes are -1 if they are not present.
+    FileSystem fileSystem = FileSystem.get(fConf);
+    FileStatus status[] = fileSystem.listStatus(new Path(jobFile).getParent());
+    long jarFileSize = -1;
+    long jobFileSize = -1;
+    for(FileStatus stat : status) {
+      if (stat.getPath().toString().contains("job.xml")) {
+        jobFileSize = stat.getLen();
+      } else {
+        jobFileSize = -1;
+      }
+      if (stat.getPath().toString().contains("job.jar")) {
+        jarFileSize = stat.getLen();
+      } else {
+        jarFileSize = -1;
+      }
+    }
+    // Here we check for double the size of jobfile to accommodate for
+    // localize task file and we check four times the size of jarFileSize to 
+    // accommodate for unjarring the jar file in work directory 
+    Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
+                                    + Path.SEPARATOR + jobId 
+                                    + Path.SEPARATOR + "job.xml"),
+                                    2 * jobFileSize + 5 * jarFileSize, fConf);
     RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
     synchronized (rjob) {
       if (!rjob.localized) {
-        localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), 
-                                jobId + Path.SEPARATOR + "job.jar");
   
-        String jobFile = t.getJobFile();
         FileSystem localFs = FileSystem.getLocal(fConf);
         // this will happen on a partial execution of localizeJob.
         // Sometimes the job.xml gets copied but copying job.jar
@@ -586,6 +611,7 @@
         JobConf localJobConf = new JobConf(localJobFile);
         String jarFile = localJobConf.getJar();
         if (jarFile != null) {
+          localJarFile = new Path(jobDir,"job.jar");
           fs.copyToLocalFile(new Path(jarFile), localJarFile);
           localJobConf.setJar(localJarFile.toString());
           OutputStream out = localFs.create(localJobFile);
@@ -1113,11 +1139,11 @@
         localDirsDf.put(localDirs[i], df);
       }
 
-      if (df.getAvailable() < minSpace)
-        return false;
+      if (df.getAvailable() > minSpace)
+        return true;
     }
 
-    return true;
+    return false;
   }
     
   /**
@@ -1249,9 +1275,10 @@
     }
         
     private void localizeTask(Task task) throws IOException{
-      Path localTaskDir =
-        new Path(this.defaultJobConf.getLocalPath(TaskTracker.getJobCacheSubdir()), 
-                 (task.getJobId() + Path.SEPARATOR + task.getTaskId()));
+      Path localTaskDir = 
+        lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + 
+                    Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
+                    task.getTaskId()), defaultJobConf );
       FileSystem localFs = FileSystem.getLocal(fConf);
       if (!localFs.mkdirs(localTaskDir)) {
         throw new IOException("Mkdirs failed to create " + localTaskDir.toString());