You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/07 23:25:08 UTC

svn commit: r535997 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Mon May  7 14:25:07 2007
New Revision: 535997

URL: http://svn.apache.org/viewvc?view=rev&rev=535997
Log:
HADOOP-1252.  Changed MapReduce's allocation of local files to use round-robin among configured devices, rather than a hashcode, also improving error handling.  Contributed by Devaraj.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May  7 14:25:07 2007
@@ -351,6 +351,11 @@
 104. HADOOP-1200.  Restore disk checking lost in HADOOP-1170.
      (Hairong Kuang via cutting)
 
+105. HADOOP-1252.  Changed MapReduce's allocation of local files to
+     use round-robin among available devices, rather than a hashcode.
+     More care is also taken to not allocate files on full or offline
+     drives.  (Devaraj Das via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java Mon May  7 14:25:07 2007
@@ -40,6 +40,10 @@
 public abstract class ChecksumFileSystem extends FilterFileSystem {
   private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
 
+  public static double getApproxChkSumLength(long size) {
+    return FSOutputSummer.CHKSUM_AS_FRACTION * size;
+  }
+  
   public ChecksumFileSystem(FileSystem fs) {
     super(fs);
   }
@@ -343,6 +347,7 @@
     private Checksum sum = new CRC32();
     private int inSum;
     private int bytesPerSum;
+    private static final float CHKSUM_AS_FRACTION = 0.01f;
     
     public FSOutputSummer(ChecksumFileSystem fs, 
                           Path file, 

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java?view=auto&rev=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java Mon May  7 14:25:07 2007
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.conf.Configuration; 
+
+/** An implementation of a round-robin scheme for disk allocation for creating
+ * files. The way it works is that it is kept track what disk was last
+ * allocated for a file write. For the current request, the next disk from
+ * the set of disks would be allocated if the free space on the disk is 
+ * sufficient enough to accomodate the file that is being considered for
+ * creation. If the space requirements cannot be met, the next disk in order
+ * would be tried and so on till a disk is found with sufficient capacity.
+ * Once a disk with sufficient space is identified, a check is done to make
+ * sure that the disk is writable. Also, there is an API provided that doesn't
+ * take the space requirements into consideration but just checks whether the
+ * disk under consideration is writable (this should be used for cases where
+ * the file size is not known apriori). An API is provided to read a path that
+ * was created earlier. That API works by doing a scan of all the disks for the
+ * input pathname.
+ * This implementation also provides the functionality of having multiple 
+ * allocators per JVM (one for each unique functionality or context, like 
+ * mapred, dfs-client, etc.). It ensures that there is only one instance of
+ * an allocator per context per JVM.
+ * Note:
+ * 1. The contexts referred above are actually the configuration items defined
+ * in the Configuration class like "mapred.local.dir" (for which we want to 
+ * control the dir allocations). The context-strings are exactly those 
+ * configuration items.
+ * 2. This implementation does not take into consideration cases where
+ * a disk becomes read-only or goes out of space while a file is being written
+ * to (disks are shared between multiple processes, and so the latter situation
+ * is probable).
+ * 3. In the class implementation, "Disk" is referred to as "Dir", which
+ * actually points to the configured directory on the Disk which will be the
+ * parent for all file write/read allocations.
+ */
+public class LocalDirAllocator {
+  
+  //A Map from the config item names like "mapred.local.dir", 
+  //"dfs.client.buffer.dir" to the instance of the AllocatorPerContext. This
+  //is a static object to make sure there exists exactly one instance per JVM
+  private static Map <String, AllocatorPerContext> contexts = 
+                 new TreeMap<String, AllocatorPerContext>();
+  private String contextCfgItemName;
+
+  /**Create an allocator object
+   * @param contextCfgItemName
+   */
+  public LocalDirAllocator(String contextCfgItemName) {
+    this.contextCfgItemName = contextCfgItemName;
+  }
+  
+  /** This method must be used to obtain the dir allocation context for a 
+   * particular value of the context name. The context name must be an item
+   * defined in the Configuration object for which we want to control the 
+   * dir allocations (e.g., <code>mapred.local.dir</code>). The method will
+   * create a context for that name if it doesn't already exist.
+   */
+  private AllocatorPerContext obtainContext(String contextCfgItemName) {
+    synchronized (contexts) {
+      AllocatorPerContext l = contexts.get(contextCfgItemName);
+      if (l == null) {
+        contexts.put(contextCfgItemName, 
+                    (l = new AllocatorPerContext(contextCfgItemName)));
+      }
+      return l;
+    }
+  }
+  
+  /** Get a path from the local FS. This method should be used if the size of 
+   *  the file is not known apriori. We go round-robin over the set of disks
+   *  (via the configured dirs) and return the first complete path where
+   *  we could create the parent directory of the passed path. 
+   *  @param pathStr the requested path (this will be created on the first 
+   *  available disk)
+   *  @param conf the Configuration object
+   *  @return the complete path to the file on a local disk
+   *  @throws IOException
+   */
+  public Path getLocalPathForWrite(String pathStr, 
+      Configuration conf) throws IOException {
+    return getLocalPathForWrite(pathStr, -1, conf);
+  }
+  
+  /** Get a path from the local FS. Pass size as -1 if not known apriori. We
+   *  round-robin over the set of disks (via the configured dirs) and return
+   *  the first complete path which has enough space 
+   *  @param pathStr the requested path (this will be created on the first 
+   *  available disk)
+   *  @param size the size of the file that is going to be written
+   *  @param conf the Configuration object
+   *  @return the complete path to the file on a local disk
+   *  @throws IOException
+   */
+  public Path getLocalPathForWrite(String pathStr, long size, 
+      Configuration conf) throws IOException {
+    AllocatorPerContext context = obtainContext(contextCfgItemName);
+    return context.getLocalPathForWrite(pathStr, size, conf);
+  }
+  
+  /** Get a path from the local FS for reading. We search through all the
+   *  configured dirs for the file's existence and return the complete
+   *  path to the file when we find one 
+   *  @param pathStr the requested file (this will be searched)
+   *  @param conf the Configuration object
+   *  @return the complete path to the file on a local disk
+   *  @throws IOException
+   */
+  public Path getLocalPathToRead(String pathStr, 
+      Configuration conf) throws IOException {
+    AllocatorPerContext context = obtainContext(contextCfgItemName);
+    return context.getLocalPathToRead(pathStr, conf);
+  }
+  
+  /** Method to check whether a context is valid
+   * @param contextCfgItemName
+   * @return true/false
+   */
+  public static boolean isContextValid(String contextCfgItemName) {
+    synchronized (contexts) {
+      return contexts.containsKey(contextCfgItemName);
+    }
+  }
+    
+  private class AllocatorPerContext {
+
+    private final Log LOG =
+      LogFactory.getLog("org.apache.hadoop.fs.AllocatorPerContext");
+
+    private int dirNumLastAccessed;
+    private FileSystem localFS;
+    private DF[] dirDF;
+    private String contextCfgItemName;
+    private String[] localDirs;
+    private String savedLocalDirs = "";
+
+    public AllocatorPerContext(String contextCfgItemName) {
+      this.contextCfgItemName = contextCfgItemName;
+    }
+
+    /** This method gets called everytime before any read/write to make sure
+     * that any change to localDirs is reflected immediately.
+     */
+    private void confChanged(Configuration conf) throws IOException {
+      String newLocalDirs = conf.get(contextCfgItemName);
+      if (!newLocalDirs.equals(savedLocalDirs)) {
+        localDirs = conf.getStrings(contextCfgItemName);
+        localFS = FileSystem.getLocal(conf);
+        int numDirs = localDirs.length;
+        dirDF = new DF[numDirs];
+        for (int i = 0; i < numDirs; i++) {
+          try {
+            localFS.mkdirs(new Path(localDirs[i]));
+          } catch (IOException ie) { } //ignore
+          dirDF[i] = new DF(new File(localDirs[i]), 30000);
+        }
+        dirNumLastAccessed = 0;
+        savedLocalDirs = newLocalDirs;
+      }
+    }
+
+    private Path createPath(String path) throws IOException {
+      Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
+                                    path);
+      //check whether we are able to create a directory here. If the disk
+      //happens to be RDONLY we will fail
+      try {
+        DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
+        return file;
+      } catch (DiskErrorException d) {
+        LOG.warn(StringUtils.stringifyException(d));
+        return null;
+      }
+    }
+
+    /** Get a path from the local FS. This method should be used if the size of 
+     *  the file is not known apriori. We go round-robin over the set of disks
+     *  (via the configured dirs) and return the first complete path where
+     *  we could create the parent directory of the passed path. 
+     */
+    public synchronized Path getLocalPathForWrite(String path, 
+        Configuration conf) throws IOException {
+      return getLocalPathForWrite(path, -1, conf);
+    }
+
+    /** Get a path from the local FS. Pass size as -1 if not known apriori. We
+     *  round-robin over the set of disks (via the configured dirs) and return
+     *  the first complete path which has enough space 
+     */
+    public synchronized Path getLocalPathForWrite(String pathStr, long size, 
+        Configuration conf) throws IOException {
+      confChanged(conf);
+      int numDirs = localDirs.length;
+      int numDirsSearched = 0;
+      //remove the leading slash from the path (to make sure that the uri
+      //resolution results in a valid path on the dir being checked)
+      if (pathStr.startsWith("/")) {
+        pathStr = pathStr.substring(1);
+      }
+      Path returnPath = null;
+      while (numDirsSearched < numDirs && returnPath == null) {
+        if (size >= 0) {
+          long capacity = dirDF[dirNumLastAccessed].getAvailable();
+          if (capacity > size) {
+            returnPath = createPath(pathStr);
+          }
+        } else {
+          returnPath = createPath(pathStr);
+        }
+        dirNumLastAccessed++;
+        dirNumLastAccessed = dirNumLastAccessed % numDirs; 
+        numDirsSearched++;
+      } 
+
+      if (returnPath != null) {
+        return returnPath;
+      }
+      
+      //no path found
+      throw new DiskErrorException("Could not find any valid local " +
+          "directory for " + pathStr);
+    }
+
+    /** Get a path from the local FS for reading. We search through all the
+     *  configured dirs for the file's existence and return the complete
+     *  path to the file when we find one 
+     */
+    public synchronized Path getLocalPathToRead(String pathStr, 
+        Configuration conf) throws IOException {
+      confChanged(conf);
+      int numDirs = localDirs.length;
+      int numDirsSearched = 0;
+      //remove the leading slash from the path (to make sure that the uri
+      //resolution results in a valid path on the dir being checked)
+      if (pathStr.startsWith("/")) {
+        pathStr = pathStr.substring(1);
+      }
+      while (numDirsSearched < numDirs) {
+        Path file = new Path(localDirs[numDirsSearched], pathStr);
+        if (localFS.exists(file)) {
+          return file;
+        }
+        numDirsSearched++;
+      }
+
+      //no path found
+      throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
+      " the configured local directories");
+    }
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon May  7 14:25:07 2007
@@ -2383,6 +2383,7 @@
         int numSegments = sortedSegmentSizes.size();
         int origFactor = factor;
         int passNo = 1;
+        LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
         do {
           //get the factor for this pass of merge
           factor = getPassFactor(passNo, numSegments);
@@ -2435,11 +2436,19 @@
             return this;
           } else {
             //we want to spread the creation of temp files on multiple disks if 
-            //available
+            //available under the space constraints
+            long approxOutputSize = 0; 
+            for (SegmentDescriptor s : segmentsToMerge) {
+              approxOutputSize += s.segmentLength + 
+                                  ChecksumFileSystem.getApproxChkSumLength(
+                                  s.segmentLength);
+            }
             Path tmpFilename = 
               new Path(tmpDir, "intermediate").suffix("." + passNo);
-            Path outputFile = conf.getLocalPath("mapred.local.dir", 
-                                                tmpFilename.toString());
+
+            Path outputFile =  lDirAlloc.getLocalPathForWrite(
+                                                tmpFilename.toString(),
+                                                approxOutputSize, conf);
             LOG.info("writing intermediate results to " + outputFile);
             Writer writer = cloneFileAttributes(
                                                 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon May  7 14:25:07 2007
@@ -140,7 +140,8 @@
           for (int i = 0; i < mapIds.size(); i++) {
             String mapId = mapIds.get(i);
             Path mapOut = this.mapoutputFile.getOutputFile(mapId);
-            Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);
+            Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId,
+                localFs.getLength(mapOut));
             if (!localFs.mkdirs(reduceIn.getParent())) {
               throw new IOException("Mkdirs failed to create "
                   + reduceIn.getParent().toString());

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Mon May  7 14:25:07 2007
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.conf.*;
 
 /**
@@ -29,49 +30,108 @@
 class MapOutputFile {
 
   private JobConf conf;
+  private LocalDirAllocator lDirAlloc = 
+                            new LocalDirAllocator("mapred.local.dir");
   
-  /** Create a local map output file name.
+  /** Return the path to local map output file created earlier
    * @param mapTaskId a map task id
    */
   public Path getOutputFile(String mapTaskId)
     throws IOException {
-    return conf.getLocalPath(mapTaskId+"/file.out");
+    return lDirAlloc.getLocalPathToRead(mapTaskId+"/file.out", conf);
   }
 
-  /** Create a local map output index file name.
+  /** Create a local map output file name.
+   * @param mapTaskId a map task id
+   * @param size the size of the file
+   */
+  public Path getOutputFileForWrite(String mapTaskId, long size)
+    throws IOException {
+    return lDirAlloc.getLocalPathForWrite(mapTaskId+"/file.out", size, conf);
+  }
+
+  /** Return the path to a local map output index file created earlier
    * @param mapTaskId a map task id
    */
   public Path getOutputIndexFile(String mapTaskId)
     throws IOException {
-    return conf.getLocalPath(mapTaskId+"/file.out.index");
+    return lDirAlloc.getLocalPathToRead(mapTaskId + "/file.out.index", conf);
   }
 
-  /** Create a local map spill file name.
+  /** Create a local map output index file name.
+   * @param mapTaskId a map task id
+   * @param size the size of the file
+   */
+  public Path getOutputIndexFileForWrite(String mapTaskId, long size)
+    throws IOException {
+    return lDirAlloc.getLocalPathForWrite(mapTaskId + "/file.out.index", 
+                                          size, conf);
+  }
+
+  /** Return a local map spill file created earlier.
    * @param mapTaskId a map task id
    * @param spillNumber the number
    */
   public Path getSpillFile(String mapTaskId, int spillNumber)
     throws IOException {
-    return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out");
+    return lDirAlloc.getLocalPathToRead(mapTaskId+"/spill" +spillNumber+".out",
+                                        conf);
   }
 
-  /** Create a local map spill index file name.
+  /** Create a local map spill file name.
+   * @param mapTaskId a map task id
+   * @param spillNumber the number
+   * @param size the size of the file
+   */
+  public Path getSpillFileForWrite(String mapTaskId, int spillNumber, 
+         long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(mapTaskId+
+                                                  "/spill" +spillNumber+".out",
+                                                  size, conf);
+  }
+
+  /** Return a local map spill index file created earlier
    * @param mapTaskId a map task id
    * @param spillNumber the number
    */
   public Path getSpillIndexFile(String mapTaskId, int spillNumber)
     throws IOException {
-    return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out.index");
+    return lDirAlloc.getLocalPathToRead(
+        mapTaskId+"/spill" +spillNumber+".out.index", conf);
   }
 
-  /** Create a local reduce input file name.
+  /** Create a local map spill index file name.
+   * @param mapTaskId a map task id
+   * @param spillNumber the number
+   * @param size the size of the file
+   */
+  public Path getSpillIndexFileForWrite(String mapTaskId, int spillNumber,
+         long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        mapTaskId+"/spill" +spillNumber+".out.index", size, conf);
+  }
+
+  /** Return a local reduce input file created earlier
    * @param mapTaskId a map task id
    * @param reduceTaskId a reduce task id
    */
   public Path getInputFile(int mapId, String reduceTaskId)
     throws IOException {
     // TODO *oom* should use a format here
-    return conf.getLocalPath(reduceTaskId+"/map_"+mapId+".out");
+    return lDirAlloc.getLocalPathToRead(reduceTaskId + "/map_"+mapId+".out",
+                                        conf);
+  }
+
+  /** Create a local reduce input file name.
+   * @param mapTaskId a map task id
+   * @param reduceTaskId a reduce task id
+   * @param size the size of the file
+   */
+  public Path getInputFileForWrite(int mapId, String reduceTaskId, long size)
+    throws IOException {
+    // TODO *oom* should use a format here
+    return lDirAlloc.getLocalPathForWrite(reduceTaskId + "/map_"+mapId+".out",
+                                          size, conf);
   }
 
   /** Removes all of the files related to a task. */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Mon May  7 14:25:07 2007
@@ -26,9 +26,11 @@
 import org.apache.hadoop.fs.InMemoryFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.conf.*;
 
 /** The location of a map output file, as passed to a reduce task via the
  * {@link InterTrackerProtocol}. */ 
@@ -174,7 +176,10 @@
    * We use the file system so that we generate checksum files on the data.
    * @param inMemFileSys the inmemory filesystem to write the file to
    * @param localFileSys the local filesystem to write the file to
+   * @param shuffleMetrics the metrics context
    * @param localFilename the filename to write the data into
+   * @param lDirAlloc the LocalDirAllocator object
+   * @param conf the Configuration object
    * @param reduce the reduce id to get for
    * @param timeout number of ms for connection and read timeout
    * @return the path of the file that got created
@@ -184,7 +189,8 @@
                       FileSystem localFileSys,
                       MetricsRecord shuffleMetrics,
                       Path localFilename, 
-                      int reduce,
+                      LocalDirAllocator lDirAlloc,
+                      Configuration conf, int reduce,
                       int timeout) throws IOException, InterruptedException {
     boolean good = false;
     long totalBytes = 0;
@@ -216,6 +222,11 @@
                        inMemFileSys.reserveSpaceWithCheckSum(localFilename, length));
       if (createInMem) {
         fileSys = inMemFileSys;
+      }
+      else {
+        //now hit the localFS to find out a suitable location for the output
+        localFilename = lDirAlloc.getLocalPathForWrite(
+            localFilename.toUri().getPath(), length + checksumLength, conf);
       }
       
       output = fileSys.create(localFilename);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon May  7 14:25:07 2007
@@ -58,6 +58,7 @@
   private BytesWritable split = new BytesWritable();
   private String splitClass;
   private InputSplit instantiatedSplit = null;
+  private final static int APPROX_HEADER_LENGTH = 150;
 
   private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
 
@@ -391,11 +392,16 @@
     //sort, combine and spill to disk
     private void sortAndSpillToDisk() throws IOException {
       synchronized (this) {
-        Path filename = mapOutputFile.getSpillFile(getTaskId(), numSpills);
+        //approximate the length of the output file to be the length of the
+        //buffer + header lengths for the partitions
+        long size = keyValBuffer.getLength() + 
+                    partitions * APPROX_HEADER_LENGTH;
+        Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), 
+                                      numSpills, size);
         //we just create the FSDataOutputStream object here.
         out = localFs.create(filename);
-        Path indexFilename = mapOutputFile.getSpillIndexFile(getTaskId(), 
-                                                             numSpills);
+        Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+                             getTaskId(), numSpills, partitions * 16);
         indexOut = localFs.create(indexFilename);
         LOG.debug("opened "+
                   mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
@@ -479,14 +485,31 @@
     }
     
     private void mergeParts() throws IOException {
-      Path finalOutputFile = mapOutputFile.getOutputFile(getTaskId());
-      Path finalIndexFile = mapOutputFile.getOutputIndexFile(getTaskId());
+      // get the approximate size of the final output/index files
+      long finalOutFileSize = 0;
+      long finalIndexFileSize = 0;
+      Path [] filename = new Path[numSpills];
+      Path [] indexFileName = new Path[numSpills];
+      
+      for(int i = 0; i < numSpills; i++) {
+        filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
+        indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
+        finalOutFileSize += localFs.getLength(filename[i]);
+      }
+      //make correction in the length to include the sequence file header
+      //lengths for each partition
+      finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
+      
+      finalIndexFileSize = partitions * 16;
+      
+      Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskId(), 
+                             finalOutFileSize);
+      Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
+                            getTaskId(), finalIndexFileSize);
       
       if (numSpills == 1) { //the spill is the final output
-        Path spillPath = mapOutputFile.getSpillFile(getTaskId(), 0);
-        Path spillIndexPath = mapOutputFile.getSpillIndexFile(getTaskId(), 0);
-        localFs.rename(spillPath, finalOutputFile);
-        localFs.rename(spillIndexPath, finalIndexFile);
+        localFs.rename(filename[0], finalOutputFile);
+        localFs.rename(indexFileName[0], finalIndexFile);
         return;
       }
       
@@ -513,14 +536,6 @@
         return;
       }
       {
-        Path [] filename = new Path[numSpills];
-        Path [] indexFileName = new Path[numSpills];
-        
-        for(int i = 0; i < numSpills; i++) {
-          filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
-          indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
-        }
-        
         //create a sorter object as we need access to the SegmentDescriptor
         //class and merge methods
         Sorter sorter = new Sorter(localFs, keyClass, valClass, job);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Mon May  7 14:25:07 2007
@@ -17,18 +17,13 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.conf.*;
-
 import java.io.*;
 
 /** Runs a map task. */
 class MapTaskRunner extends TaskRunner {
-  private MapOutputFile mapOutputFile;
 
   public MapTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
     super(task, tracker, conf);
-    this.mapOutputFile = new MapOutputFile();
-    this.mapOutputFile.setConf(conf);
   }
   
   /** Delete any temporary files from previous failed attempts. */
@@ -37,13 +32,13 @@
       return false;
     }
     
-    this.mapOutputFile.removeAll(getTask().getTaskId());
+    mapOutputFile.removeAll(getTask().getTaskId());
     return true;
   }
 
   /** Delete all of the temporary map output files. */
   public void close() throws IOException {
     LOG.info(getTask()+" done; removing files.");
-    this.mapOutputFile.removeAll(getTask().getTaskId());
+    mapOutputFile.removeAll(getTask().getTaskId());
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon May  7 14:25:07 2007
@@ -61,6 +61,7 @@
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import static org.apache.hadoop.mapred.Task.Counter.*;
 
@@ -265,7 +266,14 @@
     // the list of files to merge, otherwise not.
     List<Path> mapFilesList = new ArrayList<Path>();
     for(int i=0; i < numMaps; i++) {
-      Path f = mapOutputFile.getInputFile(i, getTaskId());
+      Path f;
+      try {
+        //catch and ignore DiskErrorException, since some map outputs will
+        //really be absent (inmem merge).
+        f = mapOutputFile.getInputFile(i, getTaskId());
+      } catch (DiskErrorException d) { 
+        continue;
+      }
       if (lfs.exists(f))
         mapFilesList.add(f);
     }
@@ -292,7 +300,7 @@
       };
     sortProgress.setName("Sort progress reporter for task "+getTaskId());
 
-    Path tempDir = job.getLocalPath(getTaskId()); 
+    Path tempDir = new Path(getTaskId()); 
 
     WritableComparator comparator = job.getOutputValueGroupingComparator();
     
@@ -496,6 +504,11 @@
     
     private Random random = null;
     
+    /**
+     * the max size of the merge output from ramfs
+     */
+    private long ramfsMergeOutputSize;
+    
     /** Represents the result of an attempt to copy a map output */
     private class CopyResult {
       
@@ -523,6 +536,15 @@
       public MapOutputLocation getLocation() { return loc; }
     }
     
+    private int extractMapIdFromPathName(Path pathname) {
+      //all paths end with map_<id>.out
+      String firstPathName = pathname.getName();
+      int beginIndex = firstPathName.lastIndexOf("map_");
+      int endIndex = firstPathName.lastIndexOf(".out");
+      return Integer.parseInt(firstPathName.substring(beginIndex +
+                              "map_".length(), endIndex));
+    }
+    
     private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
       //spawn a thread to give copy progress heartbeats
       Thread copyProgress = new Thread() {
@@ -645,14 +667,17 @@
         String reduceId = reduceTask.getTaskId();
         LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
                  " output from " + loc.getHost() + ".");
-        // the place where the file should end up
-        Path finalFilename = conf.getLocalPath(reduceId + "/map_" +
-                                               loc.getMapId() + ".out");
+        // a temp filename. If this file gets created in ramfs, we're fine,
+        // else, we will check the localFS to find a suitable final location
+        // for this path
+        Path filename = new Path("/" + reduceId + "/map_" +
+                                 loc.getMapId() + ".out");
         // a working filename that will be unique to this attempt
-        Path tmpFilename = new Path(finalFilename + "-" + id);
+        Path tmpFilename = new Path(filename + "-" + id);
         // this copies the map output file
         tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
-                                  tmpFilename, reduceTask.getPartition(),
+                                  tmpFilename, lDirAlloc, 
+                                  conf, reduceTask.getPartition(), 
                                   STALLED_COPY_TIMEOUT);
         if (!neededOutputs.contains(loc.getMapId())) {
           if (tmpFilename != null) {
@@ -662,7 +687,7 @@
           return CopyResult.OBSOLETE;
         }
         if (tmpFilename == null)
-          throw new IOException("File " + finalFilename + "-" + id + 
+          throw new IOException("File " + filename + "-" + id + 
                                 " not created");
         long bytes = -1;
         // lock the ReduceTask while we do the rename
@@ -676,9 +701,12 @@
           }
           
           bytes = fs.getLength(tmpFilename);
+          //resolve the final filename against the directory where the tmpFile
+          //got created
+          filename = new Path(tmpFilename.getParent(), filename.getName());
           // if we can't rename the file, something is broken (and IOException
           // will be thrown). 
-          if (!fs.rename(tmpFilename, finalFilename)) {
+          if (!fs.rename(tmpFilename, filename)) {
             fs.delete(tmpFilename);
             bytes = -1;
             throw new IOException("failure to rename map output " + 
@@ -766,6 +794,8 @@
       inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
       LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
                + uri);
+      ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE * 
+                                    inMemFileSys.getFSSize());
       localFileSys = FileSystem.getLocal(conf);
       //create an instance of the sorter
       sorter =
@@ -1025,9 +1055,12 @@
             //it is not guaranteed that this file will be present after merge
             //is called (we delete empty sequence files as soon as we see them
             //in the merge method)
+            int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
+            Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
+                             reduceTask.getTaskId(), ramfsMergeOutputSize);
             SequenceFile.Writer writer = sorter.cloneFileAttributes(
                                                                     inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-                                                                    localFileSys.makeQualified(inMemClosedFiles[0]), null);
+                                                                    localFileSys.makeQualified(outputPath), null);
             
             SequenceFile.Sorter.RawKeyValueIterator rIter = null;
             try {
@@ -1046,7 +1079,7 @@
             LOG.info(reduceTask.getTaskId() +
                      " Merge of the " +inMemClosedFiles.length +
                      " files in InMemoryFileSystem complete." +
-                     " Local file is " + inMemClosedFiles[0]);
+                     " Local file is " + outputPath);
           } catch (Throwable t) {
             LOG.warn(reduceTask.getTaskId() +
                      " Final merge of the inmemory files threw an exception: " + 
@@ -1151,9 +1184,16 @@
             //it is not guaranteed that this file will be present after merge
             //is called (we delete empty sequence files as soon as we see them
             //in the merge method)
+
+            //figure out the mapId 
+            int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
+            
+            Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
+                              reduceTask.getTaskId(), ramfsMergeOutputSize);
+
             SequenceFile.Writer writer = sorter.cloneFileAttributes(
                                                                     inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-                                                                    localFileSys.makeQualified(inMemClosedFiles[0]), null);
+                                                                    localFileSys.makeQualified(outputPath), null);
             SequenceFile.Sorter.RawKeyValueIterator rIter;
             try {
               rIter = sorter.merge(inMemClosedFiles, true, 
@@ -1162,7 +1202,7 @@
               //make sure that we delete the ondisk file that we created 
               //earlier when we invoked cloneFileAttributes
               writer.close();
-              localFileSys.delete(inMemClosedFiles[0]);
+              localFileSys.delete(outputPath);
               throw new IOException (StringUtils.stringifyException(e));
             }
             sorter.writeFile(rIter, writer);
@@ -1170,7 +1210,7 @@
             LOG.info(reduceTask.getTaskId() + 
                      " Merge of the " +inMemClosedFiles.length +
                      " files in InMemoryFileSystem complete." +
-                     " Local file is " + inMemClosedFiles[0]);
+                     " Local file is " + outputPath);
           }
           else {
             LOG.info(reduceTask.getTaskId() + " Nothing to merge from " + 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon May  7 14:25:07 2007
@@ -17,23 +17,15 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.conf.*;
-
 import java.io.*;
 
 /** Runs a reduce task. */
 class ReduceTaskRunner extends TaskRunner {
-  /** 
-   * for cleaning up old map outputs
-   */
-  private MapOutputFile mapOutputFile;
   
   public ReduceTaskRunner(Task task, TaskTracker tracker, 
                           JobConf conf) throws IOException {
     
     super(task, tracker, conf);
-    this.mapOutputFile = new MapOutputFile();
-    this.mapOutputFile.setConf(conf);
   }
 
   /** Assemble all of the map output files */
@@ -43,7 +35,7 @@
     }
     
     // cleanup from failures
-    this.mapOutputFile.removeAll(getTask().getTaskId());
+    mapOutputFile.removeAll(getTask().getTaskId());
     return true;
   }
   
@@ -52,6 +44,6 @@
   public void close() throws IOException {
     LOG.info(getTask()+" done; removing files.");
     getTask().getProgress().setStatus("closed");
-    this.mapOutputFile.removeAll(getTask().getTaskId());
+    mapOutputFile.removeAll(getTask().getTaskId());
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon May  7 14:25:07 2007
@@ -30,6 +30,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
@@ -69,6 +70,7 @@
   
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
+  protected LocalDirAllocator lDirAlloc;
 
   ////////////////////////////////////////////
   // Constructors
@@ -293,6 +295,7 @@
       this.conf = new JobConf(conf);
     }
     this.mapOutputFile.setConf(this.conf);
+    this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
   }
 
   public Configuration getConf() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon May  7 14:25:07 2007
@@ -43,7 +43,12 @@
 
   private TaskLog.Writer taskStdOutLogWriter;
   private TaskLog.Writer taskStdErrLogWriter;
-  
+
+  /** 
+   * for cleaning up old map outputs
+   */
+  protected MapOutputFile mapOutputFile;
+
   public TaskRunner(Task t, TaskTracker tracker, JobConf conf) {
     this.t = t;
     this.tracker = tracker;
@@ -60,6 +65,8 @@
                          this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, 
                          this.conf.getBoolean("mapred.userlog.purgesplits", true),
                          this.conf.getInt("mapred.userlog.retain.hours", 12));
+    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile.setConf(conf);
   }
 
   public Task getTask() { return t; }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon May  7 14:25:07 2007
@@ -53,6 +53,7 @@
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -88,6 +89,7 @@
 
   private boolean running = true;
 
+  private LocalDirAllocator localDirAllocator;
   String taskTrackerName;
   String localHostname;
   InetSocketAddress jobTrackAddr;
@@ -657,10 +659,12 @@
     // let the jsp pages get to the task tracker, config, and other relevant
     // objects
     FileSystem local = FileSystem.getLocal(conf);
+    this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
     server.setAttribute("task.tracker", this);
     server.setAttribute("local.file.system", local);
     server.setAttribute("conf", conf);
     server.setAttribute("log", LOG);
+    server.setAttribute("localDirAllocator", localDirAllocator);
     server.addServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
     server.start();
     this.httpPort = server.getPort();
@@ -1872,15 +1876,19 @@
       byte[] buffer = new byte[MAX_BYTES_TO_READ];
       OutputStream outStream = response.getOutputStream();
       JobConf conf = (JobConf) context.getAttribute("conf");
+      LocalDirAllocator lDirAlloc = 
+        (LocalDirAllocator)context.getAttribute("localDirAllocator");
       FileSystem fileSys = 
         (FileSystem) context.getAttribute("local.file.system");
 
       // Index file
-      Path indexFileName = conf.getLocalPath(mapId+"/file.out.index");
+      Path indexFileName = lDirAlloc.getLocalPathToRead(
+                                            mapId+"/file.out.index", conf);
       FSDataInputStream indexIn = null;
          
       // Map-output file
-      Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out"); 
+      Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
+                                            mapId+"/file.out", conf);
       FSDataInputStream mapOutputIn = null;
         
       // true iff IOException was caused by attempt to access input