You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/19 19:17:46 UTC

svn commit: r497891 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Fri Jan 19 10:17:44 2007
New Revision: 497891

URL: http://svn.apache.org/viewvc?view=rev&rev=497891
Log:
HADOOP-830.  Improve mapreduce merge performance by buffering and merging map outputs as they arrive at reduce nodes.  Contributed by Devaraj.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 19 10:17:44 2007
@@ -47,6 +47,10 @@
 14. HADOOP-735.  Switch generated record code to use BytesWritable to
     represent fields of type 'buffer'. (Milind Bhandarkar via cutting)
 
+15. HADOOP-830.  Improve mapreduce merge performance by buffering and
+    merging multiple map outputs as they arrive at reduce nodes before
+    they're written to disk.  (Devaraj Das via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri Jan 19 10:17:44 2007
@@ -125,6 +125,18 @@
 </property>
 
 <property>
+  <name>fs.ramfs.impl</name>
+  <value>org.apache.hadoop.fs.InMemoryFileSystem</value>
+  <description>The FileSystem for ramfs: uris.</description>
+</property>
+
+<property>
+  <name>fs.inmemory.size.mb</name>
+  <value>75</value>
+  <description>The size of the in-memory filsystem instance in MB</description>
+</property>
+
+<property>
   <name>dfs.datanode.bindAddress</name>
   <value>0.0.0.0</value>
   <description>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Fri Jan 19 10:17:44 2007
@@ -177,4 +177,11 @@
     return ((Buffer)out).getPos();
   }
 
+  public static long getChecksumLength(long size, int bytesPerSum) {
+    //the checksum length is equal to size passed divided by bytesPerSum +
+    //bytes written in the beginning of the checksum file.  
+    return ((long)(Math.ceil((float)size/bytesPerSum)) + 1) * 4 + 
+            CHECKSUM_VERSION.length;  
+  }
+  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Fri Jan 19 10:17:44 2007
@@ -200,6 +200,13 @@
       return new Path(file.getParent(), "."+file.getName()+".crc");
     }
 
+    /** Return the length of the checksum file given the size of the 
+     * actual file.
+     **/
+    public static long getChecksumFileLength(long fileSize, int bytesPerSum) {
+      return FSDataOutputStream.getChecksumLength(fileSize, bytesPerSum);
+    }
+    
     /** Return true iff file is a checksum file name.*/
     public static boolean isChecksumFile(Path file) {
       String name = file.getName();

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=auto&rev=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Fri Jan 19 10:17:44 2007
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/** An implementation of the in-memory filesystem. This implementation assumes
+ * that the file lengths are known ahead of time and the total lengths of all
+ * the files is below a certain number (like 100 MB, configurable). Use the API
+ * reserveSpaceWithCheckSum(Path f, int size) (see below for a description of
+ * the API for reserving space in the FS. The uri of this filesystem starts with
+ * ramfs:// .
+ * @author ddas
+ *
+ */
+public class InMemoryFileSystem extends FileSystem {
+  private URI uri;
+  private int fsSize;
+  private volatile int totalUsed;
+  private Path staticWorkingDir;
+  private int bytesPerSum;
+  
+  //pathToFileAttribs is the final place where a file is put after it is closed
+  private Map <String, FileAttributes> pathToFileAttribs = 
+    Collections.synchronizedMap(new HashMap());
+  
+  //tempFileAttribs is a temp place which is updated while reserving memory for
+  //files we are going to create. It is read in the createRaw method and the
+  //temp key/value is discarded. If the file makes it to "close", then it
+  //ends up being in the pathToFileAttribs map.
+  private Map <String, FileAttributes> tempFileAttribs = 
+    Collections.synchronizedMap(new HashMap());
+  
+  public InMemoryFileSystem() {}
+  
+  public InMemoryFileSystem(URI uri, Configuration conf) {
+    initialize(uri, conf);
+  }
+  
+  //inherit javadoc
+  public void initialize(URI uri, Configuration conf) {
+    int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
+    this.fsSize = size * 1024 * 1024;
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+    this.staticWorkingDir = new Path(this.uri.getPath());
+    this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512);
+    LOG.info("Initialized InMemoryFileSystem: " + uri.toString() + 
+             " of size (in bytes): " + fsSize);
+  }
+
+  //inherit javadoc
+  public URI getUri() {
+    return uri;
+  }
+
+  /** @deprecated */
+  public String getName() {
+    return uri.toString();
+  }
+
+  /**
+   * Return 1x1 'inmemory' cell if the file exists.
+   * Return null if otherwise.
+   */
+  public String[][] getFileCacheHints(Path f, long start, long len)
+      throws IOException {
+    if (! exists(f)) {
+      return null;
+    } else {
+      return new String[][] {{"inmemory"}};
+    }
+  }
+
+  private class InMemoryInputStream extends FSInputStream {
+    private DataInputBuffer din = new DataInputBuffer();
+    private FileAttributes fAttr;
+    
+    public InMemoryInputStream(Path f) throws IOException {
+      fAttr = pathToFileAttribs.get(getPath(f));
+      if (fAttr == null) throw new FileNotFoundException("File " + f + 
+                                                         " does not exist");
+      din.reset(fAttr.data, 0, fAttr.size);
+    }
+    
+    public long getPos() throws IOException {
+      return din.getPosition();
+    }
+    
+    public void seek(long pos) throws IOException {
+      if ((int)pos > fAttr.size)
+        throw new IOException("Cannot seek after EOF");
+      din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
+    }
+    
+    public int available() throws IOException {
+      return din.available(); 
+    }
+    public boolean markSupport() { return false; }
+
+    public int read() throws IOException {
+      return din.read();
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+      return din.read(b, off, len);
+    }
+    
+    public long skip(long n) throws IOException { return din.skip(n); }
+  }
+
+  public FSInputStream openRaw(Path f) throws IOException {
+    return new InMemoryInputStream(f);
+  }
+
+  private class InMemoryOutputStream extends FSOutputStream {
+    private int count;
+    private FileAttributes fAttr;
+    private Path f;
+    
+    public InMemoryOutputStream(Path f, FileAttributes fAttr) 
+    throws IOException {
+      this.fAttr = fAttr;
+      this.f = f;
+    }
+    
+    public long getPos() throws IOException {
+      return count;
+    }
+    
+    public void close() throws IOException {
+      synchronized (InMemoryFileSystem.this) {
+        pathToFileAttribs.put(getPath(f), fAttr);
+      }
+    }
+    
+    public void write(byte[] b, int off, int len) throws IOException {
+      if ((off < 0) || (off > b.length) || (len < 0) ||
+          ((off + len) > b.length) || ((off + len) < 0)) {
+        throw new IndexOutOfBoundsException();
+      } else if (len == 0) {
+        return;
+      }
+      int newcount = count + len;
+      if (newcount > fAttr.size) {
+        throw new IOException("Insufficient space");
+      }
+      System.arraycopy(b, off, fAttr.data, count, len);
+      count = newcount;
+    }
+    
+    public void write(int b) throws IOException {
+      int newcount = count + 1;
+      if (newcount > fAttr.size) {
+        throw new IOException("Insufficient space");
+      }
+      fAttr.data[count] = (byte)b;
+      count = newcount;
+    }
+  }
+  
+  public FSOutputStream createRaw(Path f, boolean overwrite, short replication,
+      long blockSize) throws IOException {
+    if (exists(f) && ! overwrite) {
+      throw new IOException("File already exists:"+f);
+    }
+    synchronized (this) {
+      FileAttributes fAttr =(FileAttributes)tempFileAttribs.remove(getPath(f));
+      if (fAttr != null)
+        return createRaw(f, fAttr);
+      return null;
+    }
+  }
+
+  public FSOutputStream createRaw(Path f, boolean overwrite, short replication,
+      long blockSize, Progressable progress) throws IOException {
+    //ignore write-progress reporter for in-mem files
+    return createRaw(f, overwrite, replication, blockSize);
+  }
+
+  public FSOutputStream createRaw(Path f, FileAttributes fAttr) 
+  throws IOException {
+    //the path is not added into the filesystem (in the pathToFileAttribs
+    //map) until close is called on the outputstream that this method is 
+    //going to return
+    //Create an output stream out of data byte array
+    return new InMemoryOutputStream(f, fAttr);
+  }
+
+  public void close() throws IOException {
+    super.close();
+    if (pathToFileAttribs != null) 
+      pathToFileAttribs.clear();
+    pathToFileAttribs = null;
+    if (tempFileAttribs != null)
+      tempFileAttribs.clear();
+    tempFileAttribs = null;
+  }
+
+  /**
+   * Replication is not supported for the inmemory file system.
+   */
+  public short getReplication(Path src) throws IOException {
+    return 1;
+  }
+
+  public boolean setReplicationRaw(Path src, short replication)
+      throws IOException {
+    return true;
+  }
+
+  public boolean renameRaw(Path src, Path dst) throws IOException {
+    synchronized (this) {
+      FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
+      if (fAttr == null) return false;
+      pathToFileAttribs.put(getPath(dst), fAttr);
+      return true;
+    }
+  }
+
+  public boolean deleteRaw(Path f) throws IOException {
+    synchronized (this) {
+      FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
+      if (fAttr != null) {
+        fAttr.data = null;
+        totalUsed -= fAttr.size;
+        return true;
+      }
+      return false;
+    }
+  }
+
+  public boolean exists(Path f) throws IOException {
+    return pathToFileAttribs.containsKey(getPath(f));
+  }
+  
+  /**
+   * Directory operations are not supported
+   */
+  public boolean isDirectory(Path f) throws IOException {
+    return false;
+  }
+
+  public long getLength(Path f) throws IOException {
+    return pathToFileAttribs.get(getPath(f)).size;
+  }
+  
+  /**
+   * Directory operations are not supported
+   */
+  public Path[] listPathsRaw(Path f) throws IOException {
+    return null;
+  }
+  public void setWorkingDirectory(Path new_dir) {}
+  public Path getWorkingDirectory() {
+    return staticWorkingDir;
+  }
+  public boolean mkdirs(Path f) throws IOException {
+    return false;
+  }
+  
+  /** lock operations are not supported */
+  public void lock(Path f, boolean shared) throws IOException {}
+  public void release(Path f) throws IOException {}
+  
+  /** copy/move operations are not supported */
+  public void copyFromLocalFile(Path src, Path dst) throws IOException {}
+  public void moveFromLocalFile(Path src, Path dst) throws IOException {}
+  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
+  throws IOException {}
+
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+    return fsOutputFile;
+  }
+
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+  }
+
+  public void reportChecksumFailure(Path p, FSInputStream in,
+      long inPos,
+      FSInputStream sums, long sumsPos) {
+  }
+
+  public long getBlockSize(Path f) throws IOException {
+    return getDefaultBlockSize();
+  }
+
+  public long getDefaultBlockSize() {
+    return 32 * 1024; //some random large number. can be anything actually
+  }
+
+  public short getDefaultReplication() {
+    return 1;
+  }
+  
+  /** Some APIs exclusively for InMemoryFileSystem */
+  
+  /** Register a path with its size. This will also register a checksum for 
+   * the file that the user is trying to create. This is required since none
+   * of the FileSystem APIs accept the size of the file as argument. But since
+   * it is required for us to apriori know the size of the file we are going to
+   * create, the user must call this method for each file he wants to create
+   * and reserve memory for that file. We either succeed in reserving memory
+   * for both the main file and the checksum file and return true, or return 
+   * false.
+   */
+  public boolean reserveSpaceWithCheckSum(Path f, int size) {
+    //get the size of the checksum file (we know it is going to be 'int'
+    //since this is an inmem fs with file sizes that will fit in 4 bytes)
+    int checksumSize = getChecksumFileLength(size);
+    synchronized (this) {
+      if (!canFitInMemory(size + checksumSize)) return false;
+      FileAttributes fileAttr;
+      FileAttributes checksumAttr;
+      try {
+        fileAttr = new FileAttributes(size);
+        checksumAttr = new FileAttributes(checksumSize);
+      } catch (OutOfMemoryError o) {
+        return false;
+      }
+      totalUsed += size + checksumSize;
+      tempFileAttribs.put(getPath(f), fileAttr);
+      tempFileAttribs.put(getPath(FileSystem.getChecksumFile(f)),checksumAttr); 
+      return true;
+    }
+  }
+  
+  public int getChecksumFileLength(int size) {
+    return (int)super.getChecksumFileLength(size, bytesPerSum);
+  }
+  
+  /** This API getClosedFiles could have been implemented over listPathsRaw
+   * but it is an overhead to maintain directory structures for this impl of
+   * the in-memory fs.
+   */
+  public Path[] getFiles(PathFilter filter) {
+    synchronized (this) {
+      List <String> closedFilesList = new ArrayList();
+      Set paths = pathToFileAttribs.keySet();
+      if (paths == null || paths.isEmpty()) return new Path[0];
+      Iterator iter = paths.iterator();
+      while (iter.hasNext()) {
+        String f = (String)iter.next();
+        if (filter.accept(new Path(f)))
+          closedFilesList.add(f);
+      }
+      String [] names = 
+        closedFilesList.toArray(new String[closedFilesList.size()]);
+      Path [] results = new Path[names.length];
+      for (int i = 0; i < names.length; i++) {
+        results[i] = new Path(names[i]);
+      }
+      return results;
+    }
+  }
+  
+  public int getFSSize() {
+    return fsSize;
+  }
+  
+  public float getPercentUsed() {
+    return (float)totalUsed/fsSize;
+  }
+ 
+  private boolean canFitInMemory(int size) {
+    if (size + totalUsed < fsSize)
+      return true;
+    return false;
+  }
+  
+  private String getPath(Path f) {
+    return f.toUri().getPath();
+  }
+  
+  private static class FileAttributes {
+    private byte[] data;
+    private int size;
+    
+    public FileAttributes(int size) {
+      this.size = size;
+      this.data = new byte[size];
+    }
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Jan 19 10:17:44 2007
@@ -21,6 +21,8 @@
 import java.io.*;
 import java.util.*;
 import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 import org.apache.lucene.util.PriorityQueue;
@@ -1893,7 +1895,8 @@
     }
 
     /**
-     * Merges the contents of files passed in Path[]
+     * Merges the contents of files passed in Path[] using a max factor value
+     * that is already set
      * @param inNames the array of path names
      * @param deleteInputs true if the input files should be deleted when 
      * unnecessary
@@ -1902,6 +1905,22 @@
      */
     public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs) 
     throws IOException {
+      return merge(inNames, deleteInputs, 
+                  (inNames.length < factor) ? inNames.length : factor);
+    }
+
+    /**
+     * Merges the contents of files passed in Path[]
+     * @param inNames the array of path names
+     * @param deleteInputs true if the input files should be deleted when 
+     * unnecessary
+     * @param factor the factor that will be used as the maximum merge fan-in
+     * @return RawKeyValueIteratorMergeQueue
+     * @throws IOException
+     */
+    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
+                                     int factor) 
+    throws IOException {
       //get the segments from inNames
       ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
       for (int i = 0; i < inNames.length; i++) {
@@ -1911,7 +1930,7 @@
         s.doSync();
         a.add(s);
       }
-      factor = (inNames.length < factor) ? inNames.length : factor;
+      this.factor = factor;
       MergeQueue mQueue = new MergeQueue(a);
       return mQueue.merge();
     }
@@ -1948,26 +1967,48 @@
     /**
      * Clones the attributes (like compression of the input file and creates a 
      * corresponding Writer
-     * @param fileSys the FileSystem object
+     * @param ignoredFileSys the (ignored) FileSystem object
      * @param inputFile the path of the input file whose attributes should be 
      * cloned 
      * @param outputFile the path of the output file 
      * @param prog the Progressable to report status during the file write
      * @return Writer
      * @throws IOException
+     * @deprecated call  #cloneFileAttributes(Path,Path,Progressable) instead
      */
-    public Writer cloneFileAttributes(FileSystem fileSys, Path inputFile, 
-                  Path outputFile, Progressable prog) throws IOException {
-      Reader reader = new Reader(fileSys, inputFile, 4096, conf);
+    public Writer cloneFileAttributes(FileSystem ignoredFileSys,
+                  Path inputFile, Path outputFile, Progressable prog) 
+    throws IOException {
+      return cloneFileAttributes(inputFile, outputFile, prog);
+    }
+
+    /**
+     * Clones the attributes (like compression of the input file and creates a 
+     * corresponding Writer
+     * @param inputFile the path of the input file whose attributes should be 
+     * cloned
+     * @param outputFile the path of the output file 
+     * @param prog the Progressable to report status during the file write
+     * @return Writer
+     * @throws IOException
+     */
+    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
+                  Progressable prog) throws IOException {
+      FileSystem srcFileSys = inputFile.getFileSystem(conf);
+      Reader reader = new Reader(srcFileSys, inputFile, 4096, conf);
       boolean compress = reader.isCompressed();
       boolean blockCompress = reader.isBlockCompressed();
       CompressionCodec codec = reader.getCompressionCodec();
       reader.close();
+      
+      FileSystem dstFileSys = outputFile.getFileSystem(conf);
       FSDataOutputStream out;
       if (prog != null)
-        out = fs.create(outputFile, true, memory/(factor+1), prog);
+        out = dstFileSys.create(outputFile, true, 
+            conf.getInt("io.file.buffer.size", 4096), prog);
       else
-        out = fs.create(outputFile, true, memory/(factor+1));
+        out = dstFileSys.create(outputFile, true, 
+            conf.getInt("io.file.buffer.size", 4096));
       Writer writer = createWriter(conf, out, keyClass, valClass, compress, 
                           blockCompress, codec);
       return writer;
@@ -2158,7 +2199,8 @@
             Path outputFile = conf.getLocalPath("mapred.local.dir", 
                                   (outFile.suffix("." + passNo)).toString());
             Writer writer = cloneFileAttributes(fs, 
-                            mStream[0].segmentPathName, outputFile, null);
+                            fs.makeQualified(mStream[0].segmentPathName), 
+                            fs.makeQualified(outputFile), null);
             writer.sync = null; //disable sync for temp files
             writeFile(this, writer);
             writer.close();
@@ -2276,7 +2318,7 @@
       public boolean nextRawKey() throws IOException {
         if (in == null) {
         Reader reader = new Reader(fs, segmentPathName, 
-                memory/(factor+1), segmentOffset, 
+            conf.getInt("io.file.buffer.size", 4096), segmentOffset, 
                 segmentLength, conf);
         
         //sometimes we ignore syncs especially for temp merge files

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Fri Jan 19 10:17:44 2007
@@ -29,6 +29,19 @@
     public static final long HEARTBEAT_INTERVAL = 10 * 1000;
     public static final long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
 
+    //for the inmemory filesystem (to do in-memory merge)
+    /**
+     * Constant denoting when a merge of in memory files will be triggered 
+     */
+    public static final float MAX_INMEM_FILESYS_USE = 0.5f;
+    /**
+     * Constant denoting the max size (in terms of the fraction of the total 
+     * size of the filesys) of a map output file that we will try
+     * to keep in mem. Ideally, this should be a factor of MAX_INMEM_FILESYS_USE
+     */
+    public static final float MAX_INMEM_FILESIZE_FRACTION =
+      MAX_INMEM_FILESYS_USE/2;
+    
     //
     // Result codes
     //

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Fri Jan 19 10:17:44 2007
@@ -22,6 +22,8 @@
 
 import java.io.*;
 import java.net.*;
+
+import org.apache.hadoop.fs.InMemoryFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
@@ -29,7 +31,7 @@
 
 /** The location of a map output file, as passed to a reduce task via the
  * {@link InterTrackerProtocol}. */ 
-class MapOutputLocation implements Writable {
+class MapOutputLocation implements Writable, MRConstants {
 
     static {                                      // register a ctor
       WritableFactories.setFactory
@@ -162,4 +164,96 @@
     }
     return totalBytes;
   }
+  
+  /**
+   * Get the map output into a local file (either in the inmemory fs or on the 
+   * local fs) from the remote server.
+   * We use the file system so that we generate checksum files on the data.
+   * @param inMemFileSys the inmemory filesystem to write the file to
+   * @param localFileSys the local filesystem to write the file to
+   * @param localFilename the filename to write the data into
+   * @param reduce the reduce id to get for
+   * @param timeout number of ms for connection and read timeout
+   * @return the path of the file that got created
+   * @throws IOException when something goes wrong
+   */
+  public Path getFile(InMemoryFileSystem inMemFileSys,
+                      FileSystem localFileSys,
+                      Path localFilename, 
+                      int reduce,
+                      int timeout) throws IOException, InterruptedException {
+    boolean good = false;
+    long totalBytes = 0;
+    FileSystem fileSys = localFileSys;
+    Thread currentThread = Thread.currentThread();
+    URL path = new URL(toString() + "&reduce=" + reduce);
+    try {
+      URLConnection connection = path.openConnection();
+      if (timeout > 0) {
+        connection.setConnectTimeout(timeout);
+        connection.setReadTimeout(timeout);
+      }
+      InputStream input = connection.getInputStream();
+      OutputStream output = null;
+      
+      //We will put a file in memory if it meets certain criteria:
+      //1. The size of the file should be less than 25% of the total inmem fs
+      //2. There is space available in the inmem fs
+      
+      int length = connection.getContentLength();
+      int inMemFSSize = inMemFileSys.getFSSize();
+      int checksumLength = inMemFileSys.getChecksumFileLength(length);
+        
+      boolean createInMem = 
+        (((float)(length + checksumLength) / inMemFSSize <= 
+        MAX_INMEM_FILESIZE_FRACTION) && 
+        inMemFileSys.reserveSpaceWithCheckSum(localFilename, length));
+      
+      if (createInMem)
+        fileSys = inMemFileSys;
+      else
+        fileSys = localFileSys;
+
+      output = fileSys.create(localFilename);
+      try {  
+        try {
+          byte[] buffer = new byte[64 * 1024];
+          if (currentThread.isInterrupted()) {
+            throw new InterruptedException();
+          }
+          int len = input.read(buffer);
+          while (len > 0) {
+            totalBytes += len;
+            output.write(buffer, 0 ,len);
+            if (currentThread.isInterrupted()) {
+              throw new InterruptedException();
+            }
+            len = input.read(buffer);
+          }
+        } finally {
+          output.close();
+        }
+      } finally {
+        input.close();
+      }
+      good = ((int) totalBytes) == connection.getContentLength();
+      if (!good) {
+        throw new IOException("Incomplete map output received for " + path +
+                              " (" + totalBytes + " instead of " + 
+                              connection.getContentLength() + ")");
+      }
+    } finally {
+      if (!good) {
+        try {
+          fileSys.delete(localFilename);
+          totalBytes = 0;
+        } catch (Throwable th) {
+          // IGNORED because we are cleaning up
+        }
+        return null;
+      }
+    }
+    return fileSys.makeQualified(localFilename);
+  }
+
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jan 19 10:17:44 2007
@@ -233,11 +233,18 @@
     
 
     // open a file to collect map output
-    Path[] mapFiles = new Path[numMaps];
+    // since we don't know how many map outputs got merged in memory, we have
+    // to check whether a given map output exists, and if it does, add it in
+    // the list of files to merge, otherwise not.
+    List <Path> mapFilesList = new ArrayList();
     for(int i=0; i < numMaps; i++) {
-      mapFiles[i] = mapOutputFile.getInputFile(i, getTaskId());
+      Path f = mapOutputFile.getInputFile(i, getTaskId());
+      if (lfs.exists(f))
+        mapFilesList.add(f);
     }
-
+    Path[] mapFiles = new Path[mapFilesList.size()];
+    mapFiles = mapFilesList.toArray(mapFiles);
+    
     // spawn a thread to give sort progress heartbeats
     Thread sortProgress = new Thread() {
         public void run() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Jan 19 10:17:44 2007
@@ -18,16 +18,22 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InMemoryFileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
 import org.apache.hadoop.util.*;
 
 import java.io.*;
 import java.util.*;
+import java.net.*;
 import java.text.DecimalFormat;
 import org.apache.hadoop.util.Progressable;
 
 /** Runs a reduce task. */
-class ReduceTaskRunner extends TaskRunner {
+class ReduceTaskRunner extends TaskRunner implements MRConstants {
   /** Number of ms before timing out a copy */
   private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
   
@@ -80,11 +86,31 @@
   private long lastPollTime;
   
   /**
+   * A reference to the in memory file system for writing the map outputs to.
+   */
+  private InMemoryFileSystem inMemFileSys;
+  
+  /**
    * A reference to the local file system for writing the map outputs to.
    */
   private FileSystem localFileSys;
 
   /**
+   * An instance of the sorter used for doing merge
+   */
+  private SequenceFile.Sorter sorter;
+  
+  /**
+   * A reference to the throwable object (if merge throws an exception)
+   */
+  private volatile Throwable mergeThrowable;
+  
+  /** 
+   * A flag to indicate that merge is in progress
+   */
+  private volatile boolean mergeInProgress = false;
+
+  /**
    * The threads for fetching the files.
    */
   private MapOutputCopier[] copiers = null;
@@ -120,9 +146,28 @@
     public MapOutputLocation getLocation() { return loc; }
   }
 
-  private class PingTimer implements Progressable {
+  private class PingTimer extends Thread implements Progressable {
     Task task = getTask();
     TaskTracker tracker = getTracker();
+
+    public void run() {
+      LOG.info(task.getTaskId() + " Started thread: " + getName());
+      while (true) {
+        try {
+          progress();
+          Thread.sleep(Task.PROGRESS_INTERVAL);
+        }
+        catch (InterruptedException i) {
+          return;
+        }
+        catch (Throwable e) {
+          LOG.info(task.getTaskId() + " Thread Exception in " +
+                   "reporting sort progress\n" +
+                   StringUtils.stringifyException(e));
+          continue;
+        }
+      }
+    }
     
     public void progress() {
       task.reportProgress(tracker);
@@ -134,7 +179,6 @@
   /** Copies map outputs as they become available */
   private class MapOutputCopier extends Thread {
 
-    private PingTimer pingTimer = new PingTimer();
     private MapOutputLocation currentLocation = null;
     private int id = nextMapOutputCopierId++;
     
@@ -196,7 +240,7 @@
 
           try {
             start(loc);
-            size = copyOutput(loc, pingTimer);
+            size = copyOutput(loc);
           } catch (IOException e) {
             LOG.warn(reduceTask.getTaskId() + " copy failed: " +
                         loc.getMapTaskId() + " from " + loc.getHost());
@@ -215,13 +259,11 @@
 
     /** Copies a a map output from a remote host, using raw RPC. 
      * @param currentLocation the map output location to be copied
-     * @param pingee a status object to ping as we make progress
-     * @return the size of the copied file
+     * @return the path (fully qualified) of the copied file
      * @throws IOException if there is an error copying the file
      * @throws InterruptedException if the copier should give up
      */
-    private long copyOutput(MapOutputLocation loc, 
-                            Progressable pingee
+    private long copyOutput(MapOutputLocation loc
                             ) throws IOException, InterruptedException {
 
       String reduceId = reduceTask.getTaskId();
@@ -233,21 +275,42 @@
       // a working filename that will be unique to this attempt
       Path tmpFilename = new Path(finalFilename + "-" + id);
       // this copies the map output file
-      long bytes = loc.getFile(localFileSys, tmpFilename,
-                               reduceTask.getPartition(), pingee,
+      tmpFilename = loc.getFile(inMemFileSys, localFileSys, tmpFilename,
+                               reduceTask.getPartition(),
                                STALLED_COPY_TIMEOUT);
+      if (tmpFilename == null)
+        throw new IOException("File " + finalFilename + "-" + id + 
+                              " not created");
+      long bytes = -1;
       // lock the ReduceTaskRunner while we do the rename
       synchronized (ReduceTaskRunner.this) {
-        // if we can't rename the file, something is broken
-        if (!(new File(tmpFilename.toString()).
-                 renameTo(new File(finalFilename.toString())))) {
-          localFileSys.delete(tmpFilename);
+        // if we can't rename the file, something is broken (and IOException
+        // will be thrown). This file could have been created in the inmemory
+        // fs or the localfs. So need to get the filesystem owning the path. 
+        FileSystem fs = tmpFilename.getFileSystem(conf);
+        if (!fs.rename(tmpFilename, finalFilename)) {
+          fs.delete(tmpFilename);
           throw new IOException("failure to rename map output " + tmpFilename);
         }
+        bytes = fs.getLength(finalFilename);
+        LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
+                 " output from " + loc.getHost() + ".");
+        //Create a thread to do merges. Synchronize access/update to 
+        //mergeInProgress
+        if (!mergeInProgress && inMemFileSys.getPercentUsed() >= 
+                                                       MAX_INMEM_FILESYS_USE) {
+          LOG.info(reduceId + " InMemoryFileSystem " + 
+                   inMemFileSys.getUri().toString() +
+                   " is " + inMemFileSys.getPercentUsed() + 
+                   " full. Triggering merge");
+          InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
+                                     (LocalFileSystem)localFileSys, sorter);
+          m.setName("Thread for merging in memory files");
+          m.setDaemon(true);
+          mergeInProgress = true;
+          m.start();
+        }
       }
-      LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
-               " output from " + loc.getHost() + ".");
-      
       return bytes;
     }
 
@@ -258,7 +321,6 @@
     super(task, tracker, conf);
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
-    localFileSys = FileSystem.getLocal(conf);
 
     this.reduceTask = (ReduceTask)getTask();
     this.scheduledCopies = new ArrayList(100);
@@ -266,6 +328,18 @@
     this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
     this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
 
+    //we want to distinguish inmem fs instances for different reduces. Hence,
+    //append a unique string in the uri for the inmem fs name
+    URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
+    inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
+    LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: " +
+             uri);
+    localFileSys = FileSystem.getLocal(conf);
+    //create an instance of the sorter
+    sorter =
+      new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
+          conf.getMapOutputValueClass(), conf);
+    
     // hosts -> next contact time
     this.penaltyBox = new Hashtable();
     
@@ -311,9 +385,14 @@
     // start the clock for bandwidth measurement
     long startTime = System.currentTimeMillis();
     long currentTime = startTime;
-    
+    PingTimer pingTimer = new PingTimer();
+    pingTimer.setName("Map output copy reporter for task " + 
+                      reduceTask.getTaskId());
+    pingTimer.setDaemon(true);
+    pingTimer.start();
+    try {
     // loop until we get all required outputs or are killed
-    while (!killed && numCopied < numOutputs) {
+    while (!killed && numCopied < numOutputs && mergeThrowable == null) {
 
       LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
                " map output(s)");
@@ -382,12 +461,11 @@
       // new, just wait for a bit
       try {
         if (numInFlight == 0 && numScheduled == 0) {
-          getTask().reportProgress(getTracker());
           Thread.sleep(5000);
         }
       } catch (InterruptedException e) { } // IGNORE
 
-      while (!killed && numInFlight > 0) {
+      while (!killed && numInFlight > 0 && mergeThrowable == null) {
         LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
         CopyResult cr = getCopyResult();
         
@@ -404,7 +482,6 @@
             copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + 
                                 " at " +
                                 mbpsFormat.format(transferRate) +  " MB/s)");          
-            getTask().reportProgress(getTracker());
           } else {
             // this copy failed, put it back onto neededOutputs
             neededOutputs.add(new Integer(cr.getMapId()));
@@ -454,7 +531,64 @@
       }
     }
     
-    return numCopied == numOutputs && !killed;
+    if (mergeThrowable != null) {
+      //set the task state to FAILED
+      TaskTracker tracker = ReduceTaskRunner.this.getTracker();
+      TaskTracker.TaskInProgress tip = 
+        tracker.runningTasks.get(reduceTask.getTaskId());
+      tip.runstate = TaskStatus.State.FAILED;
+      try {
+        tip.cleanup();
+      } catch (Throwable ie2) {
+        // Ignore it, we are just trying to cleanup.
+      }
+      inMemFileSys.close();
+    }
+    
+    //Do a merge of in-memory files (if there are any)
+    if (!killed && mergeThrowable == null) {
+      try {
+        //wait for an ongoing merge (if it is in flight) to complete
+        while (mergeInProgress) {
+          Thread.sleep(200);
+        }
+        LOG.info(reduceTask.getTaskId() + 
+                 " Copying of all map outputs complete. " + 
+                 "Initiating the last merge on the remaining files in " + 
+                 inMemFileSys.getUri());
+        //initiate merge
+        Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
+        if (inMemClosedFiles.length == 0) {
+          LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + 
+              inMemFileSys.getUri());
+          return numCopied == numOutputs;
+        }
+        RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, 
+                                                 inMemClosedFiles.length);
+        //name this output file same as the name of the first file that is 
+        //there in the current list of inmem files (this is guaranteed to be
+        //absent on the disk currently. So we don't overwrite a prev. 
+        //created spill)
+        SequenceFile.Writer writer = sorter.cloneFileAttributes(
+            inMemFileSys.makeQualified(inMemClosedFiles[0]), 
+            localFileSys.makeQualified(inMemClosedFiles[0]), null);
+        sorter.writeFile(rIter, writer);
+        writer.close();
+        LOG.info(reduceTask.getTaskId() +
+                 " Merge of the " +inMemClosedFiles.length +
+                 " files in InMemoryFileSystem complete." +
+                 " Local file is " + inMemClosedFiles[0]);
+      } catch (Throwable t) {
+        LOG.warn("Merge of the inmemory files threw an exception: " + 
+            StringUtils.stringifyException(t));
+        inMemFileSys.close();
+        return false;
+      }
+    }
+    return mergeThrowable == null && numCopied == numOutputs && !killed;
+    } finally {
+      pingTimer.interrupt();
+    }
   }
   
   
@@ -529,4 +663,60 @@
     }
   }
 
+  private class InMemFSMergeThread extends Thread {
+    private InMemoryFileSystem inMemFileSys;
+    private LocalFileSystem localFileSys;
+    private SequenceFile.Sorter sorter;
+    
+    public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, 
+        LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
+      this.inMemFileSys = inMemFileSys;
+      this.localFileSys = localFileSys;
+      this.sorter = sorter;
+    }
+    public void run() {
+      LOG.info(reduceTask.getTaskId() + " Thread started: " + getName());
+      try {
+        //initiate merge
+        Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
+        //Note that the above Path[] could be of length 0 if all copies are 
+        //in flight. So we make sure that we have some 'closed' map
+        //output files to merge to get the benefit of in-memory merge
+        if (inMemClosedFiles.length >= 
+          (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+          RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, 
+              inMemClosedFiles.length);
+          //name this output file same as the name of the first file that is 
+          //there in the current list of inmem files (this is guaranteed to be
+          //absent on the disk currently. So we don't overwrite a prev. 
+          //created spill)
+          SequenceFile.Writer writer = sorter.cloneFileAttributes(
+              inMemFileSys.makeQualified(inMemClosedFiles[0]), 
+              localFileSys.makeQualified(inMemClosedFiles[0]), null);
+          sorter.writeFile(rIter, writer);
+          writer.close();
+          LOG.info(reduceTask.getTaskId() + 
+                   " Merge of the " +inMemClosedFiles.length +
+                   " files in InMemoryFileSystem complete." +
+                   " Local file is " + inMemClosedFiles[0]);
+        }
+        else {
+          LOG.info(reduceTask.getTaskId() + " Nothing to merge from " + 
+              inMemFileSys.getUri());
+        }
+      } catch (Throwable t) {
+        LOG.warn("Merge of the inmemory files threw an exception: " + 
+            StringUtils.stringifyException(t));
+        ReduceTaskRunner.this.mergeThrowable = t;
+      }
+      finally {
+        mergeInProgress = false;
+      }
+    }
+  }
+  final private static PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
+    public boolean accept(Path file) {
+      return file.toString().endsWith(".out");
+    }     
+  };
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=497891&r1=497890&r2=497891
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jan 19 10:17:44 2007
@@ -885,7 +885,7 @@
     class TaskInProgress {
         Task task;
         float progress;
-        TaskStatus.State runstate;
+        volatile TaskStatus.State runstate;
         long lastProgressReport;
         StringBuffer diagnosticInfo = new StringBuffer();
         TaskRunner runner;