You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/11/19 00:43:32 UTC

svn commit: r718800 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: cdouglas
Date: Tue Nov 18 15:43:32 2008
New Revision: 718800

URL: http://svn.apache.org/viewvc?rev=718800&view=rev
Log:
Revert HADOOP-2774

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 18 15:43:32 2008
@@ -111,10 +111,6 @@
     HADOOP-4612. Removes RunJar's dependency on JobClient.
     (Sharad Agarwal via ddas)
 
-    HADOOP-2774. Provide a counter for the number of first level spill files in
-    the map task and a counter for counting the number of spilled records in both
-    map and reduce tasks.
-
     HADOOP-4185. Adds setVerifyChecksum() method to FileSystem.
     (Sharad Agarwal via ddas)
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Nov 18 15:43:32 2008
@@ -65,10 +65,7 @@
     
     long decompressedBytesWritten = 0;
     long compressedBytesWritten = 0;
-
-    // Count records when written to disk.
-    private static long numRecordsWritten = 0;
-
+    
     IFileOutputStream checksumOut;
 
     Class<K> keyClass;
@@ -181,7 +178,6 @@
       decompressedBytesWritten += keyLength + valueLength + 
                                   WritableUtils.getVIntSize(keyLength) + 
                                   WritableUtils.getVIntSize(valueLength);
-      numRecordsWritten++;
     }
     
     public void append(DataInputBuffer key, DataInputBuffer value)
@@ -207,13 +203,8 @@
       decompressedBytesWritten += keyLength + valueLength + 
                       WritableUtils.getVIntSize(keyLength) + 
                       WritableUtils.getVIntSize(valueLength);
-      numRecordsWritten++;
-    }
+}
     
-    public static long getNumRecordsWritten() {
-      return numRecordsWritten;
-    }
-
     public long getRawLength() {
       return decompressedBytesWritten;
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java Tue Nov 18 15:43:32 2008
@@ -30,14 +30,11 @@
   final long startOffset;
   final long rawLength;
   final long partLength;
-  final long numRecords;
   
-  public IndexRecord(long startOffset, long rawLength, long partLength,
-                     long numRecords) {
+  public IndexRecord(long startOffset, long rawLength, long partLength) {
     this.startOffset = startOffset;
     this.rawLength = rawLength;
     this.partLength = partLength;
-    this.numRecords = numRecords;
   }
 
   public static IndexRecord[] readIndexFile(Path indexFileName, 
@@ -63,9 +60,8 @@
         long startOffset = wrapper.readLong();
         long rawLength = wrapper.readLong();
         long partLength = wrapper.readLong();
-        long numRecords = wrapper.readLong();
         indexRecordArray[i] = 
-          new IndexRecord(startOffset, rawLength, partLength, numRecords);
+          new IndexRecord(startOffset, rawLength, partLength);
       }
     }
     finally {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java Tue Nov 18 15:43:32 2008
@@ -47,10 +47,5 @@
    */
   public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
 
-  /**
-   * The custom http header used for the number of map output records.
-   */
-  public static final String MAP_OUTPUT_NUM_RECORDS = "Map-Output-Num-Records";
-  
   public static final String WORKDIR = "work";
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 18 15:43:32 2008
@@ -24,7 +24,6 @@
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_FIRST_LEVEL_SPILLS;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -68,7 +67,7 @@
   /**
    * The size of each record in the index file for the map-outputs.
    */
-  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 32;
+  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
   
 
   private BytesWritable split = new BytesWritable();
@@ -448,7 +447,6 @@
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter combineInputCounter;
     private final Counters.Counter combineOutputCounter;
-    private final Counters.Counter firstLevelSpillsCounter;
     
     private ArrayList<IndexRecord[]> indexCacheList;
     private int totalIndexCacheMemory;
@@ -511,7 +509,6 @@
       mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
       combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
-      firstLevelSpillsCounter = counters.findCounter(MAP_FIRST_LEVEL_SPILLS);
       // compression
       if (job.getCompressMapOutput()) {
         Class<? extends CompressionCodec> codecClass =
@@ -951,10 +948,7 @@
           IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
-            long numRecordsInThisPartition = 0;
             writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
-            long prevCnt = writer.getNumRecordsWritten();
-            
             if (null == combinerClass) {
               // spill directly
               DataInputBuffer key = new DataInputBuffer();
@@ -985,19 +979,17 @@
                 combineAndSpill(kvIter, combineInputCounter);
               }
             }
-            numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
+
             // close the writer
             writer.close();
             
             if (indexChecksumOut != null) {
-              // write the index as 
-              // <offset, raw-length, compressed-length, numRecords>
-              writeIndexRecord(indexChecksumOut, segmentStart, writer,
-                               numRecordsInThisPartition);
+              // write the index as <offset, raw-length, compressed-length> 
+              writeIndexRecord(indexChecksumOut, segmentStart, writer);
             }
             else {
               irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
-                  writer.getCompressedLength(), numRecordsInThisPartition);
+                  writer.getCompressedLength());    
             }
             writer = null;
           } finally {
@@ -1066,13 +1058,12 @@
             }
             writer.close();
 
-            long numRecords = (i == partition) ? 1:0;
             if (indexChecksumOut != null) {
-              writeIndexRecord(indexChecksumOut,segmentStart,writer,numRecords);
+              writeIndexRecord(indexChecksumOut,segmentStart,writer);
             }
             else {
               irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
-                writer.getCompressedLength(), numRecords);
+                  writer.getCompressedLength());    
             }
             writer = null;
           } catch (IOException e) {
@@ -1181,8 +1172,6 @@
       long finalIndexFileSize = 0;
       Path [] filename = new Path[numSpills];
       
-      firstLevelSpillsCounter.increment(numSpills);
-
       for(int i = 0; i < numSpills; i++) {
         filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
@@ -1199,8 +1188,7 @@
           writeSingleSpillIndexToFile(getTaskID(),
               new Path(filename[0].getParent(),"file.out.index"));
         }
-        spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
-        return;
+    	  return;
       }
       //make correction in the length to include the sequence file header
       //lengths for each partition
@@ -1232,7 +1220,7 @@
           Writer<K, V> writer = new Writer<K, V>(job, finalOut, 
                                                  keyClass, valClass, codec);
           writer.close();
-          writeIndexRecord(finalIndexChecksumOut, segmentStart, writer, 0);
+          writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
         }
         finalOut.close();
         finalIndexChecksumOut.close();
@@ -1268,6 +1256,7 @@
             }
             indexRecord = null;
           }
+          
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = 
@@ -1281,37 +1270,19 @@
           long segmentStart = finalOut.getPos();
           Writer<K, V> writer = 
               new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
-          long numRecordsInThisPartition;
-          long prevCnt = Writer.getNumRecordsWritten();
           if (null == combinerClass || numSpills < minSpillsForCombine) {
             Merger.writeFile(kvIter, writer, reporter);
           } else {
             combineCollector.setWriter(writer);
             combineAndSpill(kvIter, combineInputCounter);
           }
-          numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
 
           //close
           writer.close();
           
           //write index record
-          writeIndexRecord(finalIndexChecksumOut,segmentStart, writer,
-            numRecordsInThisPartition);
+          writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
         }
-
-        // In Map Phase, Spills to disk are done at 3 places:
-        // (1) First Level Spills in sortAndSpill() - either
-        //      (a) without combiner
-        //   or (b) with combiner
-        // (2) Outputs of Intermediate(multi-level) merges in Merger.merge
-        // (3) Output of final level merge - See above if-else
-        //      (a) Merger.writeFile
-        //   or (b) combineAndSpill
-        // In all the cases, IFile.Writer.append() takes care of counting
-        // the records written to disk
-
-        spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
-
         finalOut.close();
         finalIndexChecksumOut.close();
         finalIndexOut.close();
@@ -1324,7 +1295,7 @@
 
     private void writeIndexRecord(IFileOutputStream indexOut, 
                                   long start, 
-                                  Writer<K, V> writer, long numRecords) 
+                                  Writer<K, V> writer) 
     throws IOException {
       //when we write the offset/decompressed-length/compressed-length to  
       //the final index file, we write longs for both compressed and 
@@ -1340,9 +1311,8 @@
       wrapper.writeLong(writer.getRawLength());
       long segmentLength = writer.getCompressedLength();
       wrapper.writeLong(segmentLength);
-      wrapper.writeLong(numRecords);
       LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + 
-               segmentLength + "," + numRecords + ")");
+               segmentLength + ")");
     }
     
     /**
@@ -1396,7 +1366,6 @@
         wrapper.writeLong(irArray[i].startOffset);
         wrapper.writeLong(irArray[i].rawLength);
         wrapper.writeLong(irArray[i].partLength);
-        wrapper.writeLong(irArray[i].numRecords);
       }
       
       wrapper.close();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 18 15:43:32 2008
@@ -436,17 +436,6 @@
         values.informReduceProgress();
       }
 
-      // In Reduce Phase, Spills to disk are
-      // (1) mapoutput directly written to reduceNode's disk in shuffleToDisk()
-      //       spilledRecordsCounter is updated after shuffleToDisk()
-      // (2) All other spills to disk are either through
-      //        (a) Merger.writeFile()
-      //     or (b) combineAndSpill()
-      //     In cases 2(a) & 2(b), IFile.Writer.append() takes care of
-      //     counting the records written to disk
-
-      spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
-
       //Clean up: repeated in catch block below
       reducer.close();
       out.close(reporter);
@@ -1245,8 +1234,6 @@
           Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
         long compressedLength = 
           Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
-        long numRecords = 
-          Long.parseLong(connection.getHeaderField(MAP_OUTPUT_NUM_RECORDS));
 
         // Check if this map-output can be saved in-memory
         boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 
@@ -1268,7 +1255,6 @@
 
           mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
               compressedLength);
-          spilledRecordsCounter.increment(numRecords);
         }
             
         return mapOutput;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Tue Nov 18 15:43:32 2008
@@ -62,15 +62,13 @@
     MAP_SKIPPED_RECORDS,
     MAP_INPUT_BYTES, 
     MAP_OUTPUT_BYTES,
-    MAP_FIRST_LEVEL_SPILLS,
     COMBINE_INPUT_RECORDS,
     COMBINE_OUTPUT_RECORDS,
     REDUCE_INPUT_GROUPS,
     REDUCE_INPUT_RECORDS,
     REDUCE_OUTPUT_RECORDS,
     REDUCE_SKIPPED_GROUPS,
-    REDUCE_SKIPPED_RECORDS,
-    SPILLED_RECORDS
+    REDUCE_SKIPPED_RECORDS
   }
   
   /**
@@ -133,7 +131,6 @@
   protected JobContext jobContext;
   protected TaskAttemptContext taskContext;
   private volatile boolean commitPending = false;
-  protected final Counters.Counter spilledRecordsCounter;
 
   ////////////////////////////////////////////
   // Constructors
@@ -142,7 +139,6 @@
   public Task() {
     taskStatus = TaskStatus.createTaskStatus(isMapTask());
     taskId = new TaskAttemptID();
-    spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
   }
 
   public Task(String jobFile, TaskAttemptID taskId, int partition) {
@@ -159,7 +155,6 @@
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
     this.mapOutputFile.setJobId(taskId.getJobID());
-    spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
   }
 
   ////////////////////////////////////////////

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 18 15:43:32 2008
@@ -2760,7 +2760,6 @@
         final long startOffset = info.startOffset;
         final long rawPartLength = info.rawLength;
         final long partLength = info.partLength;
-        final long numRecords = info.numRecords;
 
         //set the custom "Raw-Map-Output-Length" http header to 
         //the raw (decompressed) length
@@ -2771,11 +2770,6 @@
         response.setHeader(MAP_OUTPUT_LENGTH, 
                            Long.toString(partLength));
 
-        //set the custom "Map-Output-Num-Records" http header to 
-        //the actual number of records being transferred
-        response.setHeader(MAP_OUTPUT_NUM_RECORDS, 
-                           Long.toString(numRecords));
-
         //use the same buffersize as used for reading the data from disk
         response.setBufferSize(MAX_BYTES_TO_READ);
         

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Tue Nov 18 15:43:32 2008
@@ -14,6 +14,5 @@
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
-SPILLED_RECORDS.name=          Spilled Records
-MAP_FIRST_LEVEL_SPILLS.name=   Map First Level Spills
+
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Tue Nov 18 15:43:32 2008
@@ -43,7 +43,7 @@
     fs.delete(p, true);
     conf.setInt("mapred.tasktracker.indexcache.mb", 1);
     final int partsPerMap = 1000;
-    final int bytesPerFile = partsPerMap * 32;
+    final int bytesPerFile = partsPerMap * 24;
     IndexCache cache = new IndexCache(conf);
 
     // fill cache
@@ -105,7 +105,6 @@
     assertEquals(fill, rec.startOffset);
     assertEquals(fill, rec.rawLength);
     assertEquals(fill, rec.partLength);
-    assertEquals(fill, rec.numRecords);
   }
 
   private static void writeFile(FileSystem fs, Path f, long fill, int parts)
@@ -117,7 +116,6 @@
       dout.writeLong(fill);
       dout.writeLong(fill);
       dout.writeLong(fill);
-      dout.writeLong(fill);
     }
     dout.close();
   }