You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/19 12:53:00 UTC

svn commit: r697045 - in /hadoop/core/trunk: ./ conf/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: ddas
Date: Fri Sep 19 03:52:59 2008
New Revision: 697045

URL: http://svn.apache.org/viewvc?rev=697045&view=rev
Log:
HADOOP-3638. Caches the iFile index files in memory to reduce seeks. Contributed by Jothi Padmanabhan.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697045&r1=697044&r2=697045&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 03:52:59 2008
@@ -403,6 +403,9 @@
     HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading
     it from a different .crc file. (Jothi Padmanabhan via ddas)
 
+    HADOOP-3638. Caches the iFile index files in memory to reduce seeks
+    (Jothi Padmanabhan via ddas)
+
   BUG FIXES
 
     HADOOP-3563.  Refactor the distributed upgrade code so that it is 

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=697045&r1=697044&r2=697045&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Sep 19 03:52:59 2008
@@ -1551,4 +1551,12 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.tasktracker.indexcache.mb</name>
+  <value>10</value>
+  <description> The maximum memory that a task tracker allows for the 
+    index cache that is used when serving map outputs to reducers.
+  </description>
+</property>
+
 </configuration>

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java?rev=697045&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java Fri Sep 19 03:52:59 2008
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+
+class IndexCache {
+
+  private final JobConf conf;
+  private final int totalMemoryAllowed;
+  private AtomicInteger totalMemoryUsed = new AtomicInteger();
+  private static final Log LOG = LogFactory.getLog(IndexCache.class);
+
+  private final ConcurrentHashMap<String,IndexInformation> cache =
+    new ConcurrentHashMap<String,IndexInformation>();
+  
+  private final LinkedBlockingQueue<String> queue = 
+    new LinkedBlockingQueue<String>();
+
+  public IndexCache(JobConf conf) {
+    this.conf = conf;
+    totalMemoryAllowed =
+      conf.getInt("mapred.tasktracker.indexcache.mb", 10) * 1024 * 1024;
+    LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+  }
+
+  /**
+   * This method gets the index information for the given mapId and reduce.
+   * It reads the index file into cache if it is not already present.
+   * @param mapId
+   * @param reduce
+   * @param fileName The file to read the index information from if it is not
+   *                 already present in the cache
+   * @return The Index Information
+   * @throws IOException
+   */
+  public IndexRecord getIndexInformation(String mapId, int reduce,
+      Path fileName) throws IOException {
+
+    IndexInformation info = cache.get(mapId);
+
+    if (info == null) {
+      info = readIndexFileToCache(fileName, mapId);
+    } else {
+      synchronized (info) {
+        while (null == info.indexRecordArray) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted waiting for construction", e);
+          }
+        }
+      }
+      LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+    }
+
+    if (info.indexRecordArray.length == 0 ||
+        info.indexRecordArray.length < reduce) {
+      System.out.println("I am failing here");
+      throw new IOException("Invalid request " +
+        " Map Id = " + mapId + " Reducer = " + reduce +
+        " Index Info Length = " + info.indexRecordArray.length);
+    }
+    return info.indexRecordArray[reduce];
+  }
+
+  private IndexInformation readIndexFileToCache(Path indexFileName,
+      String mapId) throws IOException {
+    IndexInformation info;
+    IndexInformation newInd = new IndexInformation();
+    if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
+      synchronized (info) {
+        while (null == info.indexRecordArray) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted waiting for construction", e);
+          }
+        }
+      }
+      LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+      return info;
+    }
+    LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
+    IndexRecord[] tmp = null;
+    try { 
+      tmp = IndexRecord.readIndexFile(indexFileName, conf);
+    } catch (Throwable e) { 
+      tmp = new IndexRecord[0];
+      cache.remove(mapId);
+      throw new IOException("Error Reading IndexFile",e);
+    } finally { 
+      synchronized (newInd) { 
+        newInd.indexRecordArray = tmp;
+        newInd.notifyAll();
+      } 
+    } 
+    queue.add(mapId);
+    
+    if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
+      freeIndexInformation();
+    }
+    return newInd;
+  }
+
+  /**
+   * This method removes the map from the cache. It should be called when
+   * a map output on this tracker is discarded.
+   * @param mapId The taskID of this map.
+   */
+  public void removeMap(String mapId) {
+    IndexInformation info = cache.remove(mapId);
+    if (info != null) {
+      totalMemoryUsed.addAndGet(-info.getSize());
+      if (!queue.remove(mapId)) {
+        LOG.warn("Map ID" + mapId + " not found in queue!!");
+      }
+    } else {
+      LOG.info("Map ID " + mapId + " not found in cache");
+    }
+  }
+
+  /**
+   * Bring memory usage below totalMemoryAllowed.
+   */
+  private synchronized void freeIndexInformation() {
+    while (totalMemoryUsed.get() > totalMemoryAllowed) {
+      String s = queue.remove();
+      IndexInformation info = cache.remove(s);
+      if (info != null) {
+        totalMemoryUsed.addAndGet(-info.getSize());
+      }
+    }
+  }
+
+  private static class IndexInformation {
+    IndexRecord[] indexRecordArray = null;
+
+    int getSize() {
+      return ((indexRecordArray == null) ? 
+          0 : indexRecordArray.length * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+    }
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java?rev=697045&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java Fri Sep 19 03:52:59 2008
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+class IndexRecord {
+  final long startOffset;
+  final long rawLength;
+  final long partLength;
+  
+  public IndexRecord(long startOffset, long rawLength, long partLength) {
+    this.startOffset = startOffset;
+    this.rawLength = rawLength;
+    this.partLength = partLength;
+  }
+
+  public static IndexRecord[] readIndexFile(Path indexFileName, 
+                                            JobConf job) 
+  throws IOException {
+
+    FileSystem  localFs = FileSystem.getLocal(job);
+    FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
+
+    FSDataInputStream indexInputStream = rfs.open(indexFileName);
+    long length = rfs.getFileStatus(indexFileName).getLen();
+    IFileInputStream checksumIn = 
+      new IFileInputStream(indexInputStream,length);
+
+    int numEntries = (int) length/MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+    IndexRecord[] indexRecordArray = new IndexRecord[numEntries];
+    
+    DataInputStream wrapper = new DataInputStream(checksumIn);
+
+    try {
+      for (int i= 0; i < numEntries; i++) {
+        long startOffset = wrapper.readLong();
+        long rawLength = wrapper.readLong();
+        long partLength = wrapper.readLong();
+        indexRecordArray[i] = 
+          new IndexRecord(startOffset, rawLength, partLength);
+      }
+    }
+    finally {
+      //This would internally call checkumIn.close
+      wrapper.close();
+    }
+    return indexRecordArray;
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=697045&r1=697044&r2=697045&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Sep 19 03:52:59 2008
@@ -442,7 +442,11 @@
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter combineInputCounter;
     private final Counters.Counter combineOutputCounter;
-
+    
+    private ArrayList<IndexRecord[]> indexCacheList;
+    private int totalIndexCacheMemory;
+    private static final int INDEX_CACHE_MEMORY_LIMIT = 1024*1024;
+    
     @SuppressWarnings("unchecked")
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
                            Reporter reporter) throws IOException {
@@ -454,6 +458,8 @@
        
       rfs = ((LocalFileSystem)localFs).getRaw();
 
+      indexCacheList = new ArrayList<IndexRecord[]>();
+      
       //sanity checks
       final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
       final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
@@ -902,16 +908,31 @@
                   partitions * APPROX_HEADER_LENGTH;
       FSDataOutputStream out = null;
       FSDataOutputStream indexOut = null;
+      IFileOutputStream indexChecksumOut = null;
+      IndexRecord[] irArray = null;
       try {
         // create spill file
         Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
                                       numSpills, size);
         out = rfs.create(filename);
-        // create spill index
-        Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                             getTaskID(), numSpills,
-                             partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-        indexOut = localFs.create(indexFilename);
+        // All records (reducers) of a given spill go to 
+        // the same destination (memory or file).
+        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+          // create spill index file
+          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+              getTaskID(), numSpills,
+              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+
+          indexOut = rfs.create(indexFilename);
+          indexChecksumOut = new IFileOutputStream(indexOut);
+        }
+        else {
+          irArray = new IndexRecord[partitions];
+          indexCacheList.add(numSpills,irArray);
+          totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+        }
+          
+        
         final int endPosition = (kvend > kvstart)
           ? kvend
           : kvoffsets.length + kvend;
@@ -957,8 +978,14 @@
             // close the writer
             writer.close();
             
-            // write the index as <offset, raw-length, compressed-length> 
-            writeIndexRecord(indexOut, out, segmentStart, writer);
+            if (indexChecksumOut != null) {
+              // write the index as <offset, raw-length, compressed-length> 
+              writeIndexRecord(indexChecksumOut, segmentStart, writer);
+            }
+            else {
+              irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
+                  writer.getCompressedLength());    
+            }
             writer = null;
           } finally {
             if (null != writer) writer.close();
@@ -968,6 +995,9 @@
         ++numSpills;
       } finally {
         if (out != null) out.close();
+        if (indexChecksumOut != null) {
+          indexChecksumOut.close();
+        }
         if (indexOut != null) indexOut.close();
       }
     }
@@ -982,17 +1012,30 @@
       long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
       FSDataOutputStream out = null;
       FSDataOutputStream indexOut = null;
+      IFileOutputStream indexChecksumOut = null;
+      IndexRecord[] irArray = null;
       final int partition = partitioner.getPartition(key, value, partitions);
       try {
         // create spill file
         Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
                                       numSpills, size);
         out = rfs.create(filename);
-        // create spill index
-        Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                             getTaskID(), numSpills,
-                             partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-        indexOut = localFs.create(indexFilename);
+        
+        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+          // create spill index
+          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+              getTaskID(), numSpills,
+              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+
+          indexOut = rfs.create(indexFilename);
+          indexChecksumOut = new IFileOutputStream(indexOut);
+        }
+        else {
+          irArray = new IndexRecord[partitions];
+          indexCacheList.add(numSpills,irArray);
+          totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+        }
+ 
         // we don't run the combiner for a single record
         for (int i = 0; i < partitions; ++i) {
           IFile.Writer<K, V> writer = null;
@@ -1010,8 +1053,14 @@
             }
             writer.close();
 
-            // index record
-            writeIndexRecord(indexOut, out, segmentStart, writer);
+            if (indexChecksumOut != null) {
+              writeIndexRecord(indexChecksumOut,segmentStart,writer);
+            }
+            else {
+              irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
+                  writer.getCompressedLength());    
+            }
+            writer = null;
           } catch (IOException e) {
             if (null != writer) writer.close();
             throw e;
@@ -1020,6 +1069,7 @@
         ++numSpills;
       } finally {
         if (out != null) out.close();
+        if (indexChecksumOut != null) indexChecksumOut.close();
         if (indexOut != null) indexOut.close();
       }
     }
@@ -1116,20 +1166,23 @@
       long finalOutFileSize = 0;
       long finalIndexFileSize = 0;
       Path [] filename = new Path[numSpills];
-      Path [] indexFileName = new Path[numSpills];
-      FileSystem localFs = FileSystem.getLocal(job);
       
       for(int i = 0; i < numSpills; i++) {
         filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
-        indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
       }
       
       if (numSpills == 1) { //the spill is the final output
-    	  rfs.rename(filename[0],
-    			  new Path(filename[0].getParent(), "file.out"));
-    	  localFs.rename(indexFileName[0],
-    			  new Path(indexFileName[0].getParent(),"file.out.index"));
+        rfs.rename(filename[0],
+            new Path(filename[0].getParent(), "file.out"));
+        if (indexCacheList.size() == 0) {
+          rfs.rename(mapOutputFile.getSpillIndexFile(getTaskID(), 0),
+              new Path(filename[0].getParent(),"file.out.index"));
+        } 
+        else { 
+          writeSingleSpillIndexToFile(getTaskID(),
+              new Path(filename[0].getParent(),"file.out.index"));
+        }
     	  return;
       }
       //make correction in the length to include the sequence file header
@@ -1149,8 +1202,12 @@
                                                4096);
 
       //The final index file output stream
-      FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
+      FSDataOutputStream finalIndexOut = rfs.create(finalIndexFile, true,
                                                         4096);
+
+      IFileOutputStream finalIndexChecksumOut = 
+        new IFileOutputStream(finalIndexOut);
+
       if (numSpills == 0) {
         //create dummy files
         for (int i = 0; i < partitions; i++) {
@@ -1158,9 +1215,10 @@
           Writer<K, V> writer = new Writer<K, V>(job, finalOut, 
                                                  keyClass, valClass, codec);
           writer.close();
-          writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
+          writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
         }
         finalOut.close();
+        finalIndexChecksumOut.close();
         finalIndexOut.close();
         return;
       }
@@ -1169,13 +1227,15 @@
           //create the segments to be merged
           List<Segment<K, V>> segmentList =
             new ArrayList<Segment<K, V>>(numSpills);
+          TaskAttemptID mapId = getTaskID();
           for(int i = 0; i < numSpills; i++) {
-            FSDataInputStream indexIn = localFs.open(indexFileName[i]);
-            indexIn.seek(parts * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-            long segmentOffset = indexIn.readLong();
-            long rawSegmentLength = indexIn.readLong();
-            long segmentLength = indexIn.readLong();
-            indexIn.close();
+            IndexRecord indexRecord = 
+              getIndexInformation(mapId, i, parts);
+
+            long segmentOffset = indexRecord.startOffset;
+            long rawSegmentLength = indexRecord.rawLength;
+            long segmentLength = indexRecord.partLength;
+
             FSDataInputStream in = rfs.open(filename[i]);
             in.seek(segmentOffset);
 
@@ -1185,9 +1245,11 @@
             segmentList.add(i, s);
             
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Index: (" + indexFileName[i] + ", " + segmentOffset + 
+              LOG.debug("MapId=" + mapId + " Reducer=" + parts +
+                  "Spill =" + i + "(" + segmentOffset + ","+ 
                         rawSegmentLength + ", " + segmentLength + ")");
             }
+            indexRecord = null;
           }
           
           //merge
@@ -1214,20 +1276,20 @@
           writer.close();
           
           //write index record
-          writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
+          writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
         }
         finalOut.close();
+        finalIndexChecksumOut.close();
         finalIndexOut.close();
         //cleanup
         for(int i = 0; i < numSpills; i++) {
           rfs.delete(filename[i],true);
-          localFs.delete(indexFileName[i], true);
         }
       }
     }
 
-    private void writeIndexRecord(FSDataOutputStream indexOut, 
-                                  FSDataOutputStream out, long start, 
+    private void writeIndexRecord(IFileOutputStream indexOut, 
+                                  long start, 
                                   Writer<K, V> writer) 
     throws IOException {
       //when we write the offset/decompressed-length/compressed-length to  
@@ -1238,14 +1300,73 @@
       //file by doing this as opposed to writing VLong but it helps us later on.
       // index record: <offset, raw-length, compressed-length> 
       //StringBuffer sb = new StringBuffer();
-      indexOut.writeLong(start);
-      indexOut.writeLong(writer.getRawLength());
+      
+      DataOutputStream wrapper = new DataOutputStream(indexOut);
+      wrapper.writeLong(start);
+      wrapper.writeLong(writer.getRawLength());
       long segmentLength = writer.getCompressedLength();
-      indexOut.writeLong(segmentLength);
+      wrapper.writeLong(segmentLength);
       LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + 
                segmentLength + ")");
     }
     
+    /**
+     * This function returns the index information for the given mapId, Spill
+     * number and reducer combination.  Index Information is obtained 
+     * transparently from either the indexMap or the underlying indexFile
+     * @param mapId
+     * @param spillNum
+     * @param reducer
+     * @return
+     * @throws IOException
+     */
+    private IndexRecord getIndexInformation( TaskAttemptID mapId,
+                                             int spillNum,
+                                             int reducer) 
+      throws IOException {
+      IndexRecord[] irArray = null;
+      
+      if (indexCacheList.size() > spillNum) {
+        irArray = indexCacheList.get(spillNum);
+      }
+      else {
+        Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, spillNum);
+        irArray = IndexRecord.readIndexFile(indexFileName, job);
+        indexCacheList.add(spillNum,irArray);
+        rfs.delete(indexFileName,false);
+      }
+      return irArray[reducer];
+    }
+    
+    /**
+     * This function writes index information from the indexMap to the 
+     * index file that could be used by mergeParts
+     * @param mapId
+     * @param finalName
+     * @throws IOException
+     */
+    private void writeSingleSpillIndexToFile(TaskAttemptID mapId,
+                                             Path finalName) 
+    throws IOException {
+    
+      IndexRecord[] irArray = null;
+            
+      irArray = indexCacheList.get(0);
+      
+      FSDataOutputStream indexOut = rfs.create(finalName);
+      IFileOutputStream indexChecksumOut = new IFileOutputStream (indexOut);
+      DataOutputStream wrapper = new DataOutputStream(indexChecksumOut);
+      
+      for (int i = 0; i < irArray.length; i++) {
+        wrapper.writeLong(irArray[i].startOffset);
+        wrapper.writeLong(irArray[i].rawLength);
+        wrapper.writeLong(irArray[i].partLength);
+      }
+      
+      wrapper.close();
+      indexOut.close();
+    }
+    
   } // MapOutputBuffer
   
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=697045&r1=697044&r2=697045&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 19 03:52:59 2008
@@ -207,6 +207,8 @@
    * Number of maptask completion events locations to poll for at one time
    */  
   private int probe_sample_size = 500;
+
+  private IndexCache indexCache;
     
   /*
    * A list of commitTaskActions for whom commit response has been received 
@@ -475,6 +477,7 @@
                                     (maxCurrentMapTasks + 
                                         maxCurrentReduceTasks);
     }
+    this.indexCache = new IndexCache(this.fConf);
     // start the taskMemoryManager thread only if enabled
     setTaskMemoryManagerEnabledFlag();
     if (isTaskMemoryManagerEnabled()) {
@@ -1347,6 +1350,10 @@
         // Add this tips of this job to queue of tasks to be purged 
         for (TaskInProgress tip : rjob.tasks) {
           tip.jobHasFinished(false);
+          Task t = tip.getTask();
+          if (t.isMapTask()) {
+            indexCache.removeMap(tip.getTask().getTaskID().toString());
+          }
         }
         // Delete the job directory for this  
         // task if the job is done/failed
@@ -1381,6 +1388,9 @@
       // removing the job if it's the last task
       removeTaskFromJob(tip.getTask().getJobID(), tip);
       tip.jobHasFinished(wasFailure);
+      if (tip.getTask().isMapTask()) {
+        indexCache.removeMap(tip.getTask().getTaskID().toString());
+      }
     }
   }
 
@@ -2748,7 +2758,6 @@
       // true iff IOException was caused by attempt to access input
       boolean isInputException = true;
       OutputStream outStream = null;
-      FSDataInputStream indexIn = null;
       FSDataInputStream mapOutputIn = null;
  
       IFileInputStream checksumInputStream = null;
@@ -2756,6 +2765,9 @@
       long totalRead = 0;
       ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
                                       context.getAttribute("shuffleServerMetrics");
+      TaskTracker tracker = 
+        (TaskTracker) context.getAttribute("task.tracker");
+
       try {
         shuffleMetrics.serverHandlerBusy();
         outStream = response.getOutputStream();
@@ -2781,20 +2793,13 @@
          * Read the index file to get the information about where
          * the map-output for the given reducer is available. 
          */
-        //open index file
-        indexIn = fileSys.open(indexFileName);
-
-        //seek to the correct offset for the given reduce
-        indexIn.seek(reduce * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+       IndexRecord info = 
+          tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
           
-        //read the offset and length of the partition data
-        final long startOffset = indexIn.readLong();
-        final long rawPartLength = indexIn.readLong();
-        final long partLength = indexIn.readLong();
+        final long startOffset = info.startOffset;
+        final long rawPartLength = info.rawLength;
+        final long partLength = info.partLength;
 
-        indexIn.close();
-        indexIn = null;
-          
         //set the custom "Raw-Map-Output-Length" http header to 
         //the raw (decompressed) length
         response.setHeader(RAW_MAP_OUTPUT_LENGTH, Long.toString(rawPartLength));
@@ -2859,8 +2864,6 @@
                  rawPartLength + " from " + startOffset + " with (" + 
                  firstKeyLength + ", " + firstValueLength + ")");
       } catch (IOException ie) {
-        TaskTracker tracker = 
-          (TaskTracker) context.getAttribute("task.tracker");
         Log log = (Log) context.getAttribute("log");
         String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 
                            ") failed :\n"+
@@ -2873,10 +2876,6 @@
         shuffleMetrics.failedOutput();
         throw ie;
       } finally {
-        if (indexIn != null) {
-          indexIn.close();
-        }
-
         if (checksumInputStream != null) {
           checksumInputStream.close();
         }

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java?rev=697045&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Fri Sep 19 03:52:59 2008
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import junit.framework.TestCase;
+
+public class TestIndexCache extends TestCase {
+
+  public void testLRCPolicy() throws Exception {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println("seed: " + seed);
+    JobConf conf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+        "cache").makeQualified(fs);
+    fs.delete(p, true);
+    conf.setInt("mapred.tasktracker.indexcache.mb", 1);
+    final int partsPerMap = 1000;
+    final int bytesPerFile = partsPerMap * 24;
+    IndexCache cache = new IndexCache(conf);
+
+    // fill cache
+    int totalsize = bytesPerFile;
+    for (; totalsize < 1024 * 1024; totalsize += bytesPerFile) {
+      Path f = new Path(p, Integer.toString(totalsize, 36));
+      writeFile(fs, f, totalsize, partsPerMap);
+      IndexRecord rec = cache.getIndexInformation(
+          Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f);
+      checkRecord(rec, totalsize);
+    }
+
+    // delete files, ensure cache retains all elem
+    for (FileStatus stat : fs.listStatus(p)) {
+      fs.delete(stat.getPath(),true);
+    }
+    for (int i = bytesPerFile; i < 1024 * 1024; i += bytesPerFile) {
+      Path f = new Path(p, Integer.toString(i, 36));
+      IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
+          r.nextInt(partsPerMap), f);
+      checkRecord(rec, i);
+    }
+
+    // push oldest (bytesPerFile) out of cache
+    Path f = new Path(p, Integer.toString(totalsize, 36));
+    writeFile(fs, f, totalsize, partsPerMap);
+    cache.getIndexInformation(Integer.toString(totalsize, 36),
+        r.nextInt(partsPerMap), f);
+    fs.delete(f, false);
+
+    // oldest fails to read, or error
+    boolean fnf = false;
+    try {
+      cache.getIndexInformation(Integer.toString(bytesPerFile, 36),
+          r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)));
+    } catch (IOException e) {
+      if (e.getCause() == null ||
+          !(e.getCause()  instanceof FileNotFoundException)) {
+        throw e;
+      }
+      else {
+        fnf = true;
+      }
+    }
+    if (!fnf)
+      fail("Failed to push out last entry");
+    // should find all the other entries
+    for (int i = bytesPerFile << 1; i < 1024 * 1024; i += bytesPerFile) {
+      IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
+          r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)));
+      checkRecord(rec, i);
+    }
+    IndexRecord rec = cache.getIndexInformation(Integer.toString(totalsize, 36),
+        r.nextInt(partsPerMap), f);
+    checkRecord(rec, totalsize);
+  }
+
+  private static void checkRecord(IndexRecord rec, long fill) {
+    assertEquals(fill, rec.startOffset);
+    assertEquals(fill, rec.rawLength);
+    assertEquals(fill, rec.partLength);
+  }
+
+  private static void writeFile(FileSystem fs, Path f, long fill, int parts)
+      throws IOException {
+    FSDataOutputStream out = fs.create(f, false);
+    IFileOutputStream iout = new IFileOutputStream(out);
+    DataOutputStream dout = new DataOutputStream(iout);
+    for (int i = 0; i < parts; ++i) {
+      dout.writeLong(fill);
+      dout.writeLong(fill);
+      dout.writeLong(fill);
+    }
+    dout.close();
+  }
+}