You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/12/02 07:49:08 UTC

svn commit: r722389 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: cdouglas
Date: Mon Dec  1 22:49:07 2008
New Revision: 722389

URL: http://svn.apache.org/viewvc?rev=722389&view=rev
Log:
HADOOP-4649. Improve abstraction for spill indices.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java
Removed:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Dec  1 22:49:07 2008
@@ -147,6 +147,8 @@
     HADOOP-4513. Initialize jobs asynchronously in the capacity scheduler.
     (Sreekanth Ramakrishnan via yhemanth)
 
+    HADOOP-4649. Improve abstraction for spill indices. (cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Mon Dec  1 22:49:07 2008
@@ -40,7 +40,7 @@
 import org.apache.hadoop.io.serializer.Serializer;
 
 /**
- * <code>IFile</code> is the simple <key-len, key, value-len, value> format
+ * <code>IFile</code> is the simple <key-len, value-len, key, value> format
  * for the intermediate map-outputs in Map-Reduce.
  * 
  * There is a <code>Writer</code> to write out map-outputs in this format and 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexCache.java Mon Dec  1 22:49:07 2008
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -66,7 +65,7 @@
       info = readIndexFileToCache(fileName, mapId);
     } else {
       synchronized (info) {
-        while (null == info.indexRecordArray) {
+        while (null == info.mapSpillRecord) {
           try {
             info.wait();
           } catch (InterruptedException e) {
@@ -77,14 +76,13 @@
       LOG.debug("IndexCache HIT: MapId " + mapId + " found");
     }
 
-    if (info.indexRecordArray.length == 0 ||
-        info.indexRecordArray.length < reduce) {
-      System.out.println("I am failing here");
+    if (info.mapSpillRecord.size() == 0 ||
+        info.mapSpillRecord.size() < reduce) {
       throw new IOException("Invalid request " +
         " Map Id = " + mapId + " Reducer = " + reduce +
-        " Index Info Length = " + info.indexRecordArray.length);
+        " Index Info Length = " + info.mapSpillRecord.size());
     }
-    return info.indexRecordArray[reduce];
+    return info.mapSpillRecord.getIndex(reduce);
   }
 
   private IndexInformation readIndexFileToCache(Path indexFileName,
@@ -93,7 +91,7 @@
     IndexInformation newInd = new IndexInformation();
     if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
       synchronized (info) {
-        while (null == info.indexRecordArray) {
+        while (null == info.mapSpillRecord) {
           try {
             info.wait();
           } catch (InterruptedException e) {
@@ -105,16 +103,16 @@
       return info;
     }
     LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
-    IndexRecord[] tmp = null;
+    SpillRecord tmp = null;
     try { 
-      tmp = IndexRecord.readIndexFile(indexFileName, conf);
+      tmp = new SpillRecord(indexFileName, conf);
     } catch (Throwable e) { 
-      tmp = new IndexRecord[0];
+      tmp = new SpillRecord(0);
       cache.remove(mapId);
-      throw new IOException("Error Reading IndexFile",e);
+      throw new IOException("Error Reading IndexFile", e);
     } finally { 
       synchronized (newInd) { 
-        newInd.indexRecordArray = tmp;
+        newInd.mapSpillRecord = tmp;
         newInd.notifyAll();
       } 
     } 
@@ -157,11 +155,12 @@
   }
 
   private static class IndexInformation {
-    IndexRecord[] indexRecordArray = null;
+    SpillRecord mapSpillRecord;
 
     int getSize() {
-      return ((indexRecordArray == null) ? 
-          0 : indexRecordArray.length * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+      return mapSpillRecord == null
+        ? 0
+        : mapSpillRecord.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
     }
   }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Mon Dec  1 22:49:07 2008
@@ -448,10 +448,10 @@
     private final Counters.Counter combineInputCounter;
     private final Counters.Counter combineOutputCounter;
     
-    private ArrayList<IndexRecord[]> indexCacheList;
+    private ArrayList<SpillRecord> indexCacheList;
     private int totalIndexCacheMemory;
-    private static final int INDEX_CACHE_MEMORY_LIMIT = 1024*1024;
-    
+    private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
+
     @SuppressWarnings("unchecked")
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
                            Reporter reporter) throws IOException {
@@ -463,7 +463,7 @@
        
       rfs = ((LocalFileSystem)localFs).getRaw();
 
-      indexCacheList = new ArrayList<IndexRecord[]>();
+      indexCacheList = new ArrayList<SpillRecord>();
       
       //sanity checks
       final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
@@ -912,37 +912,19 @@
           : (bufvoid - bufend) + bufstart) +
                   partitions * APPROX_HEADER_LENGTH;
       FSDataOutputStream out = null;
-      FSDataOutputStream indexOut = null;
-      IFileOutputStream indexChecksumOut = null;
-      IndexRecord[] irArray = null;
       try {
         // create spill file
-        Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
-                                      numSpills, size);
+        final SpillRecord spillRec = new SpillRecord(partitions);
+        final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
+            numSpills, size);
         out = rfs.create(filename);
-        // All records (reducers) of a given spill go to 
-        // the same destination (memory or file).
-        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
-          // create spill index file
-          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-              getTaskID(), numSpills,
-              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
 
-          indexOut = rfs.create(indexFilename);
-          indexChecksumOut = new IFileOutputStream(indexOut);
-        }
-        else {
-          irArray = new IndexRecord[partitions];
-          indexCacheList.add(numSpills,irArray);
-          totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-        }
-          
-        
         final int endPosition = (kvend > kvstart)
           ? kvend
           : kvoffsets.length + kvend;
         sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
         int spindex = kvstart;
+        IndexRecord rec = new IndexRecord();
         InMemValBytes value = new InMemValBytes();
         for (int i = 0; i < partitions; ++i) {
           IFile.Writer<K, V> writer = null;
@@ -983,28 +965,34 @@
 
             // close the writer
             writer.close();
-            
-            if (indexChecksumOut != null) {
-              // write the index as <offset, raw-length, compressed-length> 
-              writeIndexRecord(indexChecksumOut, segmentStart, writer);
-            }
-            else {
-              irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
-                  writer.getCompressedLength());    
-            }
+
+            // record offsets
+            rec.startOffset = segmentStart;
+            rec.rawLength = writer.getRawLength();
+            rec.partLength = writer.getCompressedLength();
+            spillRec.putIndex(rec, i);
+
             writer = null;
           } finally {
             if (null != writer) writer.close();
           }
         }
+
+        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+          // create spill index file
+          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+              getTaskID(), numSpills,
+              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+          spillRec.writeToFile(indexFilename, job);
+        } else {
+          indexCacheList.add(spillRec);
+          totalIndexCacheMemory +=
+            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+        }
         LOG.info("Finished spill " + numSpills);
         ++numSpills;
       } finally {
         if (out != null) out.close();
-        if (indexChecksumOut != null) {
-          indexChecksumOut.close();
-        }
-        if (indexOut != null) indexOut.close();
       }
     }
 
@@ -1017,38 +1005,22 @@
         throws IOException {
       long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
       FSDataOutputStream out = null;
-      FSDataOutputStream indexOut = null;
-      IFileOutputStream indexChecksumOut = null;
-      IndexRecord[] irArray = null;
       final int partition = partitioner.getPartition(key, value, partitions);
       try {
         // create spill file
-        Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
-                                      numSpills, size);
+        final SpillRecord spillRec = new SpillRecord(partitions);
+        final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
+            numSpills, size);
         out = rfs.create(filename);
         
-        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
-          // create spill index
-          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-              getTaskID(), numSpills,
-              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-
-          indexOut = rfs.create(indexFilename);
-          indexChecksumOut = new IFileOutputStream(indexOut);
-        }
-        else {
-          irArray = new IndexRecord[partitions];
-          indexCacheList.add(numSpills,irArray);
-          totalIndexCacheMemory += partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-        }
- 
         // we don't run the combiner for a single record
+        IndexRecord rec = new IndexRecord();
         for (int i = 0; i < partitions; ++i) {
           IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
             // Create a new codec, don't care!
-            writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec,
+            writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
                                             spilledRecordsCounter);
 
             if (i == partition) {
@@ -1060,24 +1032,32 @@
             }
             writer.close();
 
-            if (indexChecksumOut != null) {
-              writeIndexRecord(indexChecksumOut,segmentStart,writer);
-            }
-            else {
-              irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
-                  writer.getCompressedLength());    
-            }
+            // record offsets
+            rec.startOffset = segmentStart;
+            rec.rawLength = writer.getRawLength();
+            rec.partLength = writer.getCompressedLength();
+            spillRec.putIndex(rec, i);
+
             writer = null;
           } catch (IOException e) {
             if (null != writer) writer.close();
             throw e;
           }
         }
+        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+          // create spill index file
+          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+              getTaskID(), numSpills,
+              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+          spillRec.writeToFile(indexFilename, job);
+        } else {
+          indexCacheList.add(spillRec);
+          totalIndexCacheMemory +=
+            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+        }
         ++numSpills;
       } finally {
         if (out != null) out.close();
-        if (indexChecksumOut != null) indexChecksumOut.close();
-        if (indexOut != null) indexOut.close();
       }
     }
 
@@ -1172,102 +1152,99 @@
       // get the approximate size of the final output/index files
       long finalOutFileSize = 0;
       long finalIndexFileSize = 0;
-      Path [] filename = new Path[numSpills];
-      
+      final Path[] filename = new Path[numSpills];
+      final TaskAttemptID mapId = getTaskID();
+
       for(int i = 0; i < numSpills; i++) {
-        filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
+        filename[i] = mapOutputFile.getSpillFile(mapId, i);
         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
       }
-      
       if (numSpills == 1) { //the spill is the final output
         rfs.rename(filename[0],
             new Path(filename[0].getParent(), "file.out"));
         if (indexCacheList.size() == 0) {
-          rfs.rename(mapOutputFile.getSpillIndexFile(getTaskID(), 0),
-              new Path(filename[0].getParent(),"file.out.index"));
-        } 
-        else { 
-          writeSingleSpillIndexToFile(getTaskID(),
+          rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
               new Path(filename[0].getParent(),"file.out.index"));
+        } else {
+          indexCacheList.get(0).writeToFile(
+                new Path(filename[0].getParent(),"file.out.index"), job);
         }
         return;
       }
+
+      // read in paged indices
+      for (int i = indexCacheList.size(); i < numSpills; ++i) {
+        Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
+        indexCacheList.add(new SpillRecord(indexFileName, job));
+      }
+
       //make correction in the length to include the sequence file header
       //lengths for each partition
       finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
-      
       finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-      
-      Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskID(), 
+      Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
                              finalOutFileSize);
       Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
-                            getTaskID(), finalIndexFileSize);
-      
-      //The output stream for the final single output file
-
-      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true,
-                                               4096);
-
-      //The final index file output stream
-      FSDataOutputStream finalIndexOut = rfs.create(finalIndexFile, true,
-                                                        4096);
+                            mapId, finalIndexFileSize);
 
-      IFileOutputStream finalIndexChecksumOut = 
-        new IFileOutputStream(finalIndexOut);
+      //The output stream for the final single output file
+      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
 
       if (numSpills == 0) {
         //create dummy files
-        for (int i = 0; i < partitions; i++) {
-          long segmentStart = finalOut.getPos();
-          Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass,
-                                                 valClass, codec, null);
-          writer.close();
-          writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
+        IndexRecord rec = new IndexRecord();
+        SpillRecord sr = new SpillRecord(partitions);
+        try {
+          for (int i = 0; i < partitions; i++) {
+            long segmentStart = finalOut.getPos();
+            Writer<K, V> writer =
+              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
+            writer.close();
+            rec.startOffset = segmentStart;
+            rec.rawLength = writer.getRawLength();
+            rec.partLength = writer.getCompressedLength();
+            sr.putIndex(rec, i);
+          }
+          sr.writeToFile(finalIndexFile, job);
+        } finally {
+          finalOut.close();
         }
-        finalOut.close();
-        finalIndexChecksumOut.close();
-        finalIndexOut.close();
         return;
       }
       {
-        for (int parts = 0; parts < partitions; parts++){
+        IndexRecord rec = new IndexRecord();
+        final SpillRecord spillRec = new SpillRecord(partitions);
+        for (int parts = 0; parts < partitions; parts++) {
           //create the segments to be merged
-          List<Segment<K, V>> segmentList =
+          List<Segment<K,V>> segmentList =
             new ArrayList<Segment<K, V>>(numSpills);
-          TaskAttemptID mapId = getTaskID();
           for(int i = 0; i < numSpills; i++) {
-            final IndexRecord indexRecord =
-              getIndexInformation(mapId, i, parts);
-
-            long segmentOffset = indexRecord.startOffset;
-            long segmentLength = indexRecord.partLength;
+            IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
-            Segment<K, V> s =
-              new Segment<K, V>(job, rfs, filename[i], segmentOffset,
-                                segmentLength, codec, true);
+            Segment<K,V> s =
+              new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
+                               indexRecord.partLength, codec, true);
             segmentList.add(i, s);
-            
+
             if (LOG.isDebugEnabled()) {
-              long rawSegmentLength = indexRecord.rawLength;
               LOG.debug("MapId=" + mapId + " Reducer=" + parts +
-                  "Spill =" + i + "(" + segmentOffset + ","+ 
-                        rawSegmentLength + ", " + segmentLength + ")");
+                  "Spill =" + i + "(" + indexRecord.startOffset + "," +
+                  indexRecord.rawLength + ", " + indexRecord.partLength + ")");
             }
           }
-          
+
           //merge
           @SuppressWarnings("unchecked")
-          RawKeyValueIterator kvIter = 
-            Merger.merge(job, rfs,
+          RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                          keyClass, valClass,
-                         segmentList, job.getInt("io.sort.factor", 100), 
-                         new Path(getTaskID().toString()), 
+                         segmentList, job.getInt("io.sort.factor", 100),
+                         new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter,
                          null, spilledRecordsCounter);
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
-          Writer<K, V> writer = 
+          Writer<K, V> writer =
               new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
                                spilledRecordsCounter);
           if (null == combinerClass || numSpills < minSpillsForCombine) {
@@ -1279,99 +1256,21 @@
 
           //close
           writer.close();
-          
-          //write index record
-          writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
+
+          // record offsets
+          rec.startOffset = segmentStart;
+          rec.rawLength = writer.getRawLength();
+          rec.partLength = writer.getCompressedLength();
+          spillRec.putIndex(rec, parts);
         }
+        spillRec.writeToFile(finalIndexFile, job);
         finalOut.close();
-        finalIndexChecksumOut.close();
-        finalIndexOut.close();
-        //cleanup
         for(int i = 0; i < numSpills; i++) {
           rfs.delete(filename[i],true);
         }
       }
     }
 
-    private void writeIndexRecord(IFileOutputStream indexOut, 
-                                  long start, 
-                                  Writer<K, V> writer) 
-    throws IOException {
-      //when we write the offset/decompressed-length/compressed-length to  
-      //the final index file, we write longs for both compressed and 
-      //decompressed lengths. This helps us to reliably seek directly to 
-      //the offset/length for a partition when we start serving the 
-      //byte-ranges to the reduces. We probably waste some space in the 
-      //file by doing this as opposed to writing VLong but it helps us later on.
-      // index record: <offset, raw-length, compressed-length> 
-      //StringBuffer sb = new StringBuffer();
-      
-      DataOutputStream wrapper = new DataOutputStream(indexOut);
-      wrapper.writeLong(start);
-      wrapper.writeLong(writer.getRawLength());
-      long segmentLength = writer.getCompressedLength();
-      wrapper.writeLong(segmentLength);
-      LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + 
-               segmentLength + ")");
-    }
-    
-    /**
-     * This function returns the index information for the given mapId, Spill
-     * number and reducer combination.  Index Information is obtained 
-     * transparently from either the indexMap or the underlying indexFile
-     * @param mapId
-     * @param spillNum
-     * @param reducer
-     * @return
-     * @throws IOException
-     */
-    private IndexRecord getIndexInformation( TaskAttemptID mapId,
-                                             int spillNum,
-                                             int reducer) 
-      throws IOException {
-      IndexRecord[] irArray = null;
-      
-      if (indexCacheList.size() > spillNum) {
-        irArray = indexCacheList.get(spillNum);
-      }
-      else {
-        Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, spillNum);
-        irArray = IndexRecord.readIndexFile(indexFileName, job);
-        indexCacheList.add(spillNum,irArray);
-        rfs.delete(indexFileName,false);
-      }
-      return irArray[reducer];
-    }
-    
-    /**
-     * This function writes index information from the indexMap to the 
-     * index file that could be used by mergeParts
-     * @param mapId
-     * @param finalName
-     * @throws IOException
-     */
-    private void writeSingleSpillIndexToFile(TaskAttemptID mapId,
-                                             Path finalName) 
-    throws IOException {
-    
-      IndexRecord[] irArray = null;
-            
-      irArray = indexCacheList.get(0);
-      
-      FSDataOutputStream indexOut = rfs.create(finalName);
-      IFileOutputStream indexChecksumOut = new IFileOutputStream (indexOut);
-      DataOutputStream wrapper = new DataOutputStream(indexChecksumOut);
-      
-      for (int i = 0; i < irArray.length; i++) {
-        wrapper.writeLong(irArray[i].startOffset);
-        wrapper.writeLong(irArray[i].rawLength);
-        wrapper.writeLong(irArray[i].partLength);
-      }
-      
-      wrapper.close();
-      indexOut.close();
-    }
-    
   } // MapOutputBuffer
   
   /**

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java?rev=722389&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SpillRecord.java Mon Dec  1 22:49:07 2008
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+class SpillRecord {
+
+  /** Backing store */
+  private final ByteBuffer buf;
+  /** View of backing storage as longs */
+  private final LongBuffer entries;
+
+  public SpillRecord(int numPartitions) {
+    buf = ByteBuffer.allocate(
+        numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+    entries = buf.asLongBuffer();
+  }
+
+  public SpillRecord(Path indexFileName, JobConf job) throws IOException {
+    this(indexFileName, job, new CRC32());
+  }
+
+  public SpillRecord(Path indexFileName, JobConf job, Checksum crc)
+      throws IOException {
+
+    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+    final FSDataInputStream in = rfs.open(indexFileName);
+    try {
+      final long length = rfs.getFileStatus(indexFileName).getLen();
+      final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
+      final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+      buf = ByteBuffer.allocate(size);
+      if (crc != null) {
+        crc.reset();
+        CheckedInputStream chk = new CheckedInputStream(in, crc);
+        IOUtils.readFully(chk, buf.array(), 0, size);
+        if (chk.getChecksum().getValue() != in.readLong()) {
+          throw new ChecksumException("Checksum error reading spill index: " +
+                                indexFileName, -1);
+        }
+      } else {
+        IOUtils.readFully(in, buf.array(), 0, size);
+      }
+      entries = buf.asLongBuffer();
+    } finally {
+      in.close();
+    }
+  }
+
+  /**
+   * Return number of IndexRecord entries in this spill.
+   */
+  public int size() {
+    return entries.capacity() / (MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8);
+  }
+
+  /**
+   * Get spill offsets for given partition.
+   */
+  public IndexRecord getIndex(int partition) {
+    final int pos = partition * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+    return new IndexRecord(entries.get(pos), entries.get(pos + 1),
+                           entries.get(pos + 2));
+  }
+
+  /**
+   * Set spill offsets for given partition.
+   */
+  public void putIndex(IndexRecord rec, int partition) {
+    final int pos = partition * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+    entries.put(pos, rec.startOffset);
+    entries.put(pos + 1, rec.rawLength);
+    entries.put(pos + 2, rec.partLength);
+  }
+
+  /**
+   * Write this spill record to the location provided.
+   */
+  public void writeToFile(Path loc, JobConf job)
+      throws IOException {
+    writeToFile(loc, job, new CRC32());
+  }
+
+  public void writeToFile(Path loc, JobConf job, Checksum crc)
+      throws IOException {
+    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+    CheckedOutputStream chk = null;
+    final FSDataOutputStream out = rfs.create(loc);
+    try {
+      if (crc != null) {
+        crc.reset();
+        chk = new CheckedOutputStream(out, crc);
+        chk.write(buf.array());
+        out.writeLong(chk.getChecksum().getValue());
+      } else {
+        out.write(buf.array());
+      }
+    } finally {
+      if (chk != null) {
+        chk.close();
+      } else {
+        out.close();
+      }
+    }
+  }
+
+}
+
+class IndexRecord {
+  long startOffset;
+  long rawLength;
+  long partLength;
+
+  public IndexRecord() { }
+
+  public IndexRecord(long startOffset, long rawLength, long partLength) {
+    this.startOffset = startOffset;
+    this.rawLength = rawLength;
+    this.partLength = partLength;
+  }
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java?rev=722389&r1=722388&r2=722389&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Mon Dec  1 22:49:07 2008
@@ -21,7 +21,10 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
 
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -101,6 +104,41 @@
     checkRecord(rec, totalsize);
   }
 
+  public void testBadIndex() throws Exception {
+    final int parts = 30;
+    JobConf conf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+        "cache").makeQualified(fs);
+    fs.delete(p, true);
+    conf.setInt("mapred.tasktracker.indexcache.mb", 1);
+    IndexCache cache = new IndexCache(conf);
+
+    Path f = new Path(p, "badindex");
+    FSDataOutputStream out = fs.create(f, false);
+    CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
+    DataOutputStream dout = new DataOutputStream(iout);
+    for (int i = 0; i < parts; ++i) {
+      for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
+        if (0 == (i % 3)) {
+          dout.writeLong(i);
+        } else {
+          out.writeLong(i);
+        }
+      }
+    }
+    out.writeLong(iout.getChecksum().getValue());
+    dout.close();
+    try {
+      cache.getIndexInformation("badindex", 7, f);
+      fail("Did not detect bad checksum");
+    } catch (IOException e) {
+      if (!(e.getCause() instanceof ChecksumException)) {
+        throw e;
+      }
+    }
+  }
+
   private static void checkRecord(IndexRecord rec, long fill) {
     assertEquals(fill, rec.startOffset);
     assertEquals(fill, rec.rawLength);
@@ -110,13 +148,14 @@
   private static void writeFile(FileSystem fs, Path f, long fill, int parts)
       throws IOException {
     FSDataOutputStream out = fs.create(f, false);
-    IFileOutputStream iout = new IFileOutputStream(out);
+    CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
     DataOutputStream dout = new DataOutputStream(iout);
     for (int i = 0; i < parts; ++i) {
-      dout.writeLong(fill);
-      dout.writeLong(fill);
-      dout.writeLong(fill);
+      for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
+        dout.writeLong(fill);
+      }
     }
+    out.writeLong(iout.getChecksum().getValue());
     dout.close();
   }
 }