You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/11/18 10:35:35 UTC

svn commit: r718533 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: ddas
Date: Tue Nov 18 01:35:32 2008
New Revision: 718533

URL: http://svn.apache.org/viewvc?rev=718533&view=rev
Log:
HADOOP-2774. Provide a counter for the number of first level spill files in the map task and a counter for counting the number of spilled records in both map and reduce tasks. Contributed by Ravi Gummadi.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 18 01:35:32 2008
@@ -111,6 +111,10 @@
     HADOOP-4612. Removes RunJar's dependency on JobClient.
     (Sharad Agarwal via ddas)
 
+    HADOOP-2774. Provide a counter for the number of first level spill files in
+    the map task and a counter for counting the number of spilled records in both
+    map and reduce tasks.
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Nov 18 01:35:32 2008
@@ -65,7 +65,10 @@
     
     long decompressedBytesWritten = 0;
     long compressedBytesWritten = 0;
-    
+
+    // Count records when written to disk.
+    private static long numRecordsWritten = 0;
+
     IFileOutputStream checksumOut;
 
     Class<K> keyClass;
@@ -178,6 +181,7 @@
       decompressedBytesWritten += keyLength + valueLength + 
                                   WritableUtils.getVIntSize(keyLength) + 
                                   WritableUtils.getVIntSize(valueLength);
+      numRecordsWritten++;
     }
     
     public void append(DataInputBuffer key, DataInputBuffer value)
@@ -203,8 +207,13 @@
       decompressedBytesWritten += keyLength + valueLength + 
                       WritableUtils.getVIntSize(keyLength) + 
                       WritableUtils.getVIntSize(valueLength);
-}
+      numRecordsWritten++;
+    }
     
+    public static long getNumRecordsWritten() {
+      return numRecordsWritten;
+    }
+
     public long getRawLength() {
       return decompressedBytesWritten;
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java Tue Nov 18 01:35:32 2008
@@ -30,11 +30,14 @@
   final long startOffset;
   final long rawLength;
   final long partLength;
+  final long numRecords;
   
-  public IndexRecord(long startOffset, long rawLength, long partLength) {
+  public IndexRecord(long startOffset, long rawLength, long partLength,
+                     long numRecords) {
     this.startOffset = startOffset;
     this.rawLength = rawLength;
     this.partLength = partLength;
+    this.numRecords = numRecords;
   }
 
   public static IndexRecord[] readIndexFile(Path indexFileName, 
@@ -60,8 +63,9 @@
         long startOffset = wrapper.readLong();
         long rawLength = wrapper.readLong();
         long partLength = wrapper.readLong();
+        long numRecords = wrapper.readLong();
         indexRecordArray[i] = 
-          new IndexRecord(startOffset, rawLength, partLength);
+          new IndexRecord(startOffset, rawLength, partLength, numRecords);
       }
     }
     finally {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java Tue Nov 18 01:35:32 2008
@@ -47,5 +47,10 @@
    */
   public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
 
+  /**
+   * The custom http header used for the number of map output records.
+   */
+  public static final String MAP_OUTPUT_NUM_RECORDS = "Map-Output-Num-Records";
+  
   public static final String WORKDIR = "work";
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 18 01:35:32 2008
@@ -24,6 +24,7 @@
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_FIRST_LEVEL_SPILLS;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -67,7 +68,7 @@
   /**
    * The size of each record in the index file for the map-outputs.
    */
-  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 32;
   
 
   private BytesWritable split = new BytesWritable();
@@ -447,6 +448,7 @@
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter combineInputCounter;
     private final Counters.Counter combineOutputCounter;
+    private final Counters.Counter firstLevelSpillsCounter;
     
     private ArrayList<IndexRecord[]> indexCacheList;
     private int totalIndexCacheMemory;
@@ -509,6 +511,7 @@
       mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
       combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
+      firstLevelSpillsCounter = counters.findCounter(MAP_FIRST_LEVEL_SPILLS);
       // compression
       if (job.getCompressMapOutput()) {
         Class<? extends CompressionCodec> codecClass =
@@ -948,7 +951,10 @@
           IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
+            long numRecordsInThisPartition = 0;
             writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
+            long prevCnt = writer.getNumRecordsWritten();
+            
             if (null == combinerClass) {
               // spill directly
               DataInputBuffer key = new DataInputBuffer();
@@ -979,17 +985,19 @@
                 combineAndSpill(kvIter, combineInputCounter);
               }
             }
-
+            numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
             // close the writer
             writer.close();
             
             if (indexChecksumOut != null) {
-              // write the index as <offset, raw-length, compressed-length> 
-              writeIndexRecord(indexChecksumOut, segmentStart, writer);
+              // write the index as 
+              // <offset, raw-length, compressed-length, numRecords>
+              writeIndexRecord(indexChecksumOut, segmentStart, writer,
+                               numRecordsInThisPartition);
             }
             else {
               irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
-                  writer.getCompressedLength());    
+                  writer.getCompressedLength(), numRecordsInThisPartition);
             }
             writer = null;
           } finally {
@@ -1058,12 +1066,13 @@
             }
             writer.close();
 
+            long numRecords = (i == partition) ? 1:0;
             if (indexChecksumOut != null) {
-              writeIndexRecord(indexChecksumOut,segmentStart,writer);
+              writeIndexRecord(indexChecksumOut,segmentStart,writer,numRecords);
             }
             else {
               irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
-                  writer.getCompressedLength());    
+                writer.getCompressedLength(), numRecords);
             }
             writer = null;
           } catch (IOException e) {
@@ -1172,6 +1181,8 @@
       long finalIndexFileSize = 0;
       Path [] filename = new Path[numSpills];
       
+      firstLevelSpillsCounter.increment(numSpills);
+
       for(int i = 0; i < numSpills; i++) {
         filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
@@ -1188,7 +1199,8 @@
           writeSingleSpillIndexToFile(getTaskID(),
               new Path(filename[0].getParent(),"file.out.index"));
         }
-    	  return;
+        spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
+        return;
       }
       //make correction in the length to include the sequence file header
       //lengths for each partition
@@ -1220,7 +1232,7 @@
           Writer<K, V> writer = new Writer<K, V>(job, finalOut, 
                                                  keyClass, valClass, codec);
           writer.close();
-          writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
+          writeIndexRecord(finalIndexChecksumOut, segmentStart, writer, 0);
         }
         finalOut.close();
         finalIndexChecksumOut.close();
@@ -1256,7 +1268,6 @@
             }
             indexRecord = null;
           }
-          
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = 
@@ -1270,19 +1281,37 @@
           long segmentStart = finalOut.getPos();
           Writer<K, V> writer = 
               new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
+          long numRecordsInThisPartition;
+          long prevCnt = Writer.getNumRecordsWritten();
           if (null == combinerClass || numSpills < minSpillsForCombine) {
             Merger.writeFile(kvIter, writer, reporter);
           } else {
             combineCollector.setWriter(writer);
             combineAndSpill(kvIter, combineInputCounter);
           }
+          numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
 
           //close
           writer.close();
           
           //write index record
-          writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
+          writeIndexRecord(finalIndexChecksumOut,segmentStart, writer,
+            numRecordsInThisPartition);
         }
+
+        // In Map Phase, Spills to disk are done at 3 places:
+        // (1) First Level Spills in sortAndSpill() - either
+        //      (a) without combiner
+        //   or (b) with combiner
+        // (2) Outputs of Intermediate(multi-level) merges in Merger.merge
+        // (3) Output of final level merge - See above if-else
+        //      (a) Merger.writeFile
+        //   or (b) combineAndSpill
+        // In all the cases, IFile.Writer.append() takes care of counting
+        // the records written to disk
+
+        spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
+
         finalOut.close();
         finalIndexChecksumOut.close();
         finalIndexOut.close();
@@ -1295,7 +1324,7 @@
 
     private void writeIndexRecord(IFileOutputStream indexOut, 
                                   long start, 
-                                  Writer<K, V> writer) 
+                                  Writer<K, V> writer, long numRecords) 
     throws IOException {
       //when we write the offset/decompressed-length/compressed-length to  
       //the final index file, we write longs for both compressed and 
@@ -1311,8 +1340,9 @@
       wrapper.writeLong(writer.getRawLength());
       long segmentLength = writer.getCompressedLength();
       wrapper.writeLong(segmentLength);
+      wrapper.writeLong(numRecords);
       LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + 
-               segmentLength + ")");
+               segmentLength + "," + numRecords + ")");
     }
     
     /**
@@ -1366,6 +1396,7 @@
         wrapper.writeLong(irArray[i].startOffset);
         wrapper.writeLong(irArray[i].rawLength);
         wrapper.writeLong(irArray[i].partLength);
+        wrapper.writeLong(irArray[i].numRecords);
       }
       
       wrapper.close();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 18 01:35:32 2008
@@ -436,6 +436,17 @@
         values.informReduceProgress();
       }
 
+      // In Reduce Phase, Spills to disk are
+      // (1) mapoutput directly written to reduceNode's disk in shuffleToDisk()
+      //       spilledRecordsCounter is updated after shuffleToDisk()
+      // (2) All other spills to disk are either through
+      //        (a) Merger.writeFile()
+      //     or (b) combineAndSpill()
+      //     In cases 2(a) & 2(b), IFile.Writer.append() takes care of
+      //     counting the records written to disk
+
+      spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
+
       //Clean up: repeated in catch block below
       reducer.close();
       out.close(reporter);
@@ -1234,6 +1245,8 @@
           Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
         long compressedLength = 
           Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
+        long numRecords = 
+          Long.parseLong(connection.getHeaderField(MAP_OUTPUT_NUM_RECORDS));
 
         // Check if this map-output can be saved in-memory
         boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 
@@ -1255,6 +1268,7 @@
 
           mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
               compressedLength);
+          spilledRecordsCounter.increment(numRecords);
         }
             
         return mapOutput;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Tue Nov 18 01:35:32 2008
@@ -62,13 +62,15 @@
     MAP_SKIPPED_RECORDS,
     MAP_INPUT_BYTES, 
     MAP_OUTPUT_BYTES,
+    MAP_FIRST_LEVEL_SPILLS,
     COMBINE_INPUT_RECORDS,
     COMBINE_OUTPUT_RECORDS,
     REDUCE_INPUT_GROUPS,
     REDUCE_INPUT_RECORDS,
     REDUCE_OUTPUT_RECORDS,
     REDUCE_SKIPPED_GROUPS,
-    REDUCE_SKIPPED_RECORDS
+    REDUCE_SKIPPED_RECORDS,
+    SPILLED_RECORDS
   }
   
   /**
@@ -131,6 +133,7 @@
   protected JobContext jobContext;
   protected TaskAttemptContext taskContext;
   private volatile boolean commitPending = false;
+  protected final Counters.Counter spilledRecordsCounter;
 
   ////////////////////////////////////////////
   // Constructors
@@ -139,6 +142,7 @@
   public Task() {
     taskStatus = TaskStatus.createTaskStatus(isMapTask());
     taskId = new TaskAttemptID();
+    spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
   }
 
   public Task(String jobFile, TaskAttemptID taskId, int partition) {
@@ -155,6 +159,7 @@
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
     this.mapOutputFile.setJobId(taskId.getJobID());
+    spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
   }
 
   ////////////////////////////////////////////

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 18 01:35:32 2008
@@ -2760,6 +2760,7 @@
         final long startOffset = info.startOffset;
         final long rawPartLength = info.rawLength;
         final long partLength = info.partLength;
+        final long numRecords = info.numRecords;
 
         //set the custom "Raw-Map-Output-Length" http header to 
         //the raw (decompressed) length
@@ -2770,6 +2771,11 @@
         response.setHeader(MAP_OUTPUT_LENGTH, 
                            Long.toString(partLength));
 
+        //set the custom "Map-Output-Num-Records" http header to 
+        //the actual number of records being transferred
+        response.setHeader(MAP_OUTPUT_NUM_RECORDS, 
+                           Long.toString(numRecords));
+
         //use the same buffersize as used for reading the data from disk
         response.setBufferSize(MAX_BYTES_TO_READ);
         

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Tue Nov 18 01:35:32 2008
@@ -14,5 +14,6 @@
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
-
+SPILLED_RECORDS.name=          Spilled Records
+MAP_FIRST_LEVEL_SPILLS.name=   Map First Level Spills
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Tue Nov 18 01:35:32 2008
@@ -43,7 +43,7 @@
     fs.delete(p, true);
     conf.setInt("mapred.tasktracker.indexcache.mb", 1);
     final int partsPerMap = 1000;
-    final int bytesPerFile = partsPerMap * 24;
+    final int bytesPerFile = partsPerMap * 32;
     IndexCache cache = new IndexCache(conf);
 
     // fill cache
@@ -105,6 +105,7 @@
     assertEquals(fill, rec.startOffset);
     assertEquals(fill, rec.rawLength);
     assertEquals(fill, rec.partLength);
+    assertEquals(fill, rec.numRecords);
   }
 
   private static void writeFile(FileSystem fs, Path f, long fill, int parts)
@@ -116,6 +117,7 @@
       dout.writeLong(fill);
       dout.writeLong(fill);
       dout.writeLong(fill);
+      dout.writeLong(fill);
     }
     dout.close();
   }