You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/11/25 23:31:33 UTC

svn commit: r720627 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: cdouglas
Date: Tue Nov 25 14:31:33 2008
New Revision: 720627

URL: http://svn.apache.org/viewvc?rev=720627&view=rev
Log:
HADOOP-2774. Add counters tracking records spilled to disk in MapTask and
ReduceTask. Contributed by Ravi Gummadi.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 25 14:31:33 2008
@@ -141,6 +141,9 @@
     HADOOP-4339. Remove redundant calls from FileSystem/FsShell when
     generating/processing ContentSummary. (David Phillips via cdouglas)
 
+    HADOOP-2774. Add counters tracking records spilled to disk in MapTask and
+    ReduceTask. (Ravi Gummadi via cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Nov 25 14:31:33 2008
@@ -65,7 +65,11 @@
     
     long decompressedBytesWritten = 0;
     long compressedBytesWritten = 0;
-    
+
+    // Count records written to disk
+    private long numRecordsWritten = 0;
+    private final Counters.Counter writtenRecordsCounter;
+
     IFileOutputStream checksumOut;
 
     Class<K> keyClass;
@@ -77,14 +81,18 @@
 
     public Writer(Configuration conf, FileSystem fs, Path file, 
                   Class<K> keyClass, Class<V> valueClass,
-                  CompressionCodec codec) throws IOException {
-      this(conf, fs.create(file), keyClass, valueClass, codec);
+                  CompressionCodec codec,
+                  Counters.Counter writesCounter) throws IOException {
+      this(conf, fs.create(file), keyClass, valueClass, codec,
+           writesCounter);
       ownOutputStream = true;
     }
     
     public Writer(Configuration conf, FSDataOutputStream out, 
         Class<K> keyClass, Class<V> valueClass,
-        CompressionCodec codec) throws IOException {
+        CompressionCodec codec, Counters.Counter writesCounter)
+        throws IOException {
+      this.writtenRecordsCounter = writesCounter;
       this.checksumOut = new IFileOutputStream(out);
       this.rawOut = out;
       this.start = this.rawOut.getPos();
@@ -107,7 +115,7 @@
       this.valueSerializer = serializationFactory.getSerializer(valueClass);
       this.valueSerializer.open(buffer);
     }
-    
+
     public void close() throws IOException {
 
       // Close the serializers
@@ -140,6 +148,9 @@
         rawOut.close();
       }
       out = null;
+      if(writtenRecordsCounter != null) {
+        writtenRecordsCounter.increment(numRecordsWritten);
+      }
     }
 
     public void append(K key, V value) throws IOException {
@@ -178,6 +189,7 @@
       decompressedBytesWritten += keyLength + valueLength + 
                                   WritableUtils.getVIntSize(keyLength) + 
                                   WritableUtils.getVIntSize(valueLength);
+      ++numRecordsWritten;
     }
     
     public void append(DataInputBuffer key, DataInputBuffer value)
@@ -203,7 +215,8 @@
       decompressedBytesWritten += keyLength + valueLength + 
                       WritableUtils.getVIntSize(keyLength) + 
                       WritableUtils.getVIntSize(valueLength);
-}
+      ++numRecordsWritten;
+    }
     
     public long getRawLength() {
       return decompressedBytesWritten;
@@ -221,6 +234,10 @@
     private static final int DEFAULT_BUFFER_SIZE = 128*1024;
     private static final int MAX_VINT_SIZE = 9;
 
+    // Count records read from disk
+    private long numRecordsRead = 0;
+    private final Counters.Counter readRecordsCounter;
+
     final InputStream in;        // Possibly decompressed stream that we read
     Decompressor decompressor;
     long bytesRead = 0;
@@ -242,14 +259,15 @@
      * @param file Path of the file to be opened. This file should have
      *             checksum bytes for the data at the end of the file.
      * @param codec codec
+     * @param readsCounter Counter for records read from disk
      * @throws IOException
      */
-    
     public Reader(Configuration conf, FileSystem fs, Path file,
-                  CompressionCodec codec) throws IOException {
+                  CompressionCodec codec,
+                  Counters.Counter readsCounter) throws IOException {
       this(conf, fs.open(file), 
            fs.getFileStatus(file).getLen(),
-           codec);
+           codec, readsCounter);
     }
 
     /**
@@ -260,10 +278,13 @@
      * @param length Length of the data in the stream, including the checksum
      *               bytes.
      * @param codec codec
+     * @param readsCounter Counter for records read from disk
      * @throws IOException
      */
     public Reader(Configuration conf, FSDataInputStream in, long length, 
-                  CompressionCodec codec) throws IOException {
+                  CompressionCodec codec,
+                  Counters.Counter readsCounter) throws IOException {
+      readRecordsCounter = readsCounter;
       checksumIn = new IFileInputStream(in,length);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
@@ -400,7 +421,8 @@
       bytesRead += recordLength;
 
       ++recNo;
-      
+      ++numRecordsRead;
+
       return true;
     }
 
@@ -418,6 +440,9 @@
       // Release the buffer
       dataIn = null;
       buffer = null;
+      if(readRecordsCounter != null) {
+        readRecordsCounter.increment(numRecordsRead);
+      }
     }
   }    
   
@@ -431,7 +456,7 @@
     public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
                           byte[] data, int start, int length)
                           throws IOException {
-      super(null, null, length - start, null);
+      super(null, null, length - start, null, null);
       this.ramManager = ramManager;
       this.taskAttemptId = taskAttemptId;
       

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 25 14:31:33 2008
@@ -948,7 +948,8 @@
           IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
-            writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
+            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
+                                      spilledRecordsCounter);
             if (null == combinerClass) {
               // spill directly
               DataInputBuffer key = new DataInputBuffer();
@@ -1047,7 +1048,8 @@
           try {
             long segmentStart = out.getPos();
             // Create a new codec, don't care!
-            writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec);
+            writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec,
+                                            spilledRecordsCounter);
 
             if (i == partition) {
               final long recordStart = out.getPos();
@@ -1188,7 +1190,7 @@
           writeSingleSpillIndexToFile(getTaskID(),
               new Path(filename[0].getParent(),"file.out.index"));
         }
-    	  return;
+        return;
       }
       //make correction in the length to include the sequence file header
       //lengths for each partition
@@ -1217,8 +1219,8 @@
         //create dummy files
         for (int i = 0; i < partitions; i++) {
           long segmentStart = finalOut.getPos();
-          Writer<K, V> writer = new Writer<K, V>(job, finalOut, 
-                                                 keyClass, valClass, codec);
+          Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass,
+                                                 valClass, codec, null);
           writer.close();
           writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
         }
@@ -1245,8 +1247,8 @@
             in.seek(segmentOffset);
 
             Segment<K, V> s = 
-              new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
-                                true);
+              new Segment<K, V>(new Reader<K, V>(job, in, segmentLength,
+                                                 codec, null), true);
             segmentList.add(i, s);
             
             if (LOG.isDebugEnabled()) {
@@ -1264,12 +1266,14 @@
                          keyClass, valClass,
                          segmentList, job.getInt("io.sort.factor", 100), 
                          new Path(getTaskID().toString()), 
-                         job.getOutputKeyComparator(), reporter);
+                         job.getOutputKeyComparator(), reporter,
+                         null, spilledRecordsCounter);
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
           Writer<K, V> writer = 
-              new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
+              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
+                               spilledRecordsCounter);
           if (null == combinerClass || numSpills < minSpillsForCombine) {
             Merger.writeFile(kvIter, writer, reporter);
           } else {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Nov 25 14:31:33 2008
@@ -54,12 +54,15 @@
                             CompressionCodec codec,
                             Path[] inputs, boolean deleteInputs, 
                             int mergeFactor, Path tmpDir,
-                            RawComparator<K> comparator, Progressable reporter)
+                            RawComparator<K> comparator, Progressable reporter,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter)
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
                            reporter).merge(keyClass, valueClass,
-                                           mergeFactor, tmpDir);
+                                           mergeFactor, tmpDir,
+                                           readsCounter, writesCounter);
   }
   
   public static <K extends Object, V extends Object>
@@ -67,10 +70,12 @@
                             Class<K> keyClass, Class<V> valueClass, 
                             List<Segment<K, V>> segments, 
                             int mergeFactor, Path tmpDir,
-                            RawComparator<K> comparator, Progressable reporter)
+                            RawComparator<K> comparator, Progressable reporter,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter)
       throws IOException {
     return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
-                 comparator, reporter, false);
+                 comparator, reporter, false, readsCounter, writesCounter);
   }
 
   public static <K extends Object, V extends Object>
@@ -79,11 +84,14 @@
                             List<Segment<K, V>> segments,
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
-                            boolean sortSegments)
+                            boolean sortSegments,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
                            sortSegments).merge(keyClass, valueClass,
-                                               mergeFactor, tmpDir);
+                                               mergeFactor, tmpDir,
+                                               readsCounter, writesCounter);
   }
 
   static <K extends Object, V extends Object>
@@ -92,12 +100,15 @@
                             List<Segment<K, V>> segments,
                             int mergeFactor, int inMemSegments, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
-                            boolean sortSegments)
+                            boolean sortSegments,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
                            sortSegments).merge(keyClass, valueClass,
                                                mergeFactor, inMemSegments,
-                                               tmpDir);
+                                               tmpDir,
+                                               readsCounter, writesCounter);
   }
 
   public static <K extends Object, V extends Object>
@@ -144,9 +155,9 @@
       this.segmentLength = reader.getLength();
     }
 
-    private void init() throws IOException {
+    private void init(Counters.Counter readsCounter) throws IOException {
       if (reader == null) {
-        reader = new Reader<K, V>(conf, fs, file, codec);
+        reader = new Reader<K, V>(conf, fs, file, codec, readsCounter);
       }
     }
     
@@ -309,13 +320,18 @@
     }
     
     public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
-                                     int factor, Path tmpDir) 
+                                     int factor, Path tmpDir,
+                                     Counters.Counter readsCounter,
+                                     Counters.Counter writesCounter)
         throws IOException {
-      return merge(keyClass, valueClass, factor, 0, tmpDir);
+      return merge(keyClass, valueClass, factor, 0, tmpDir,
+                   readsCounter, writesCounter);
     }
 
     RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
-                                     int factor, int inMem, Path tmpDir)
+                                     int factor, int inMem, Path tmpDir,
+                                     Counters.Counter readsCounter,
+                                     Counters.Counter writesCounter)
         throws IOException {
       LOG.info("Merging " + segments.size() + " sorted segments");
       
@@ -344,7 +360,7 @@
           for (Segment<K, V> segment : mStream) {
             // Initialize the segment at the last possible moment;
             // this helps in ensuring we don't use buffers until we need them
-            segment.init();
+            segment.init(readsCounter);
             long startPos = segment.getPosition();
             boolean hasNext = segment.next();
             long endPos = segment.getPosition();
@@ -417,7 +433,8 @@
                                               approxOutputSize, conf);
 
           Writer<K, V> writer = 
-            new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec);
+            new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
+                             writesCounter);
           writeFile(this, writer, reporter);
           writer.close();
           

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 25 14:31:33 2008
@@ -382,7 +382,7 @@
           job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
           !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
           new Path(getTaskID().toString()), job.getOutputKeyComparator(),
-          reporter)
+          reporter, spilledRecordsCounter, null)
       : reduceCopier.createKVIterator(job, rfs, reporter);
         
     // free up the data structures
@@ -2080,9 +2080,9 @@
                             reduceTask.getTaskID(), inMemToDiskBytes);
           final RawKeyValueIterator rIter = Merger.merge(job, fs,
               keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-              tmpDir, comparator, reporter);
+              tmpDir, comparator, reporter, spilledRecordsCounter, null);
           final Writer writer = new Writer(job, fs, outputPath,
-              keyClass, valueClass, codec);
+              keyClass, valueClass, codec, null);
           try {
             Merger.writeFile(rIter, writer, reporter);
             addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
@@ -2139,7 +2139,7 @@
         RawKeyValueIterator diskMerge = Merger.merge(
             job, fs, keyClass, valueClass, diskSegments,
             ioSortFactor, numInMemSegments, tmpDir, comparator,
-            reporter, false);
+            reporter, false, spilledRecordsCounter, null);
         diskSegments.clear();
         if (0 == finalSegments.size()) {
           return diskMerge;
@@ -2149,7 +2149,7 @@
       }
       return Merger.merge(job, fs, keyClass, valueClass,
                    finalSegments, finalSegments.size(), tmpDir,
-                   comparator, reporter);
+                   comparator, reporter, spilledRecordsCounter, null);
     }
 
     class RawKVIteratorReader extends IFile.Reader<K,V> {
@@ -2158,7 +2158,7 @@
 
       public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
           throws IOException {
-        super(null, null, size, null);
+        super(null, null, size, null, spilledRecordsCounter);
         this.kvIter = kvIter;
       }
 
@@ -2383,7 +2383,7 @@
               new Writer(conf,rfs, outputPath, 
                          conf.getMapOutputKeyClass(), 
                          conf.getMapOutputValueClass(),
-                         codec);
+                         codec, null);
             RawKeyValueIterator iter  = null;
             Path tmpDir = new Path(reduceTask.getTaskID().toString());
             final Reporter reporter = getReporter(umbilical);
@@ -2393,7 +2393,8 @@
                                   conf.getMapOutputValueClass(),
                                   codec, mapFiles.toArray(new Path[mapFiles.size()]), 
                                   true, ioSortFactor, tmpDir, 
-                                  conf.getOutputKeyComparator(), reporter);
+                                  conf.getOutputKeyComparator(), reporter,
+                                  spilledRecordsCounter, null);
               
               Merger.writeFile(iter, writer, reporter);
               writer.close();
@@ -2477,7 +2478,7 @@
           new Writer(conf, rfs, outputPath,
                      conf.getMapOutputKeyClass(),
                      conf.getMapOutputValueClass(),
-                     codec);
+                     codec, null);
 
         RawKeyValueIterator rIter = null;
         final Reporter reporter = getReporter(umbilical);
@@ -2490,7 +2491,8 @@
                                (Class<V>)conf.getMapOutputValueClass(),
                                inMemorySegments, inMemorySegments.size(),
                                new Path(reduceTask.getTaskID().toString()),
-                               conf.getOutputKeyComparator(), reporter);
+                               conf.getOutputKeyComparator(), reporter,
+                               spilledRecordsCounter, null);
           
           if (null == combinerClass) {
             Merger.writeFile(rIter, writer, reporter);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Tue Nov 25 14:31:33 2008
@@ -68,7 +68,8 @@
     REDUCE_INPUT_RECORDS,
     REDUCE_OUTPUT_RECORDS,
     REDUCE_SKIPPED_GROUPS,
-    REDUCE_SKIPPED_RECORDS
+    REDUCE_SKIPPED_RECORDS,
+    SPILLED_RECORDS
   }
   
   /**
@@ -131,6 +132,7 @@
   protected JobContext jobContext;
   protected TaskAttemptContext taskContext;
   private volatile boolean commitPending = false;
+  protected final Counters.Counter spilledRecordsCounter;
 
   ////////////////////////////////////////////
   // Constructors
@@ -139,6 +141,7 @@
   public Task() {
     taskStatus = TaskStatus.createTaskStatus(isMapTask());
     taskId = new TaskAttemptID();
+    spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
   }
 
   public Task(String jobFile, TaskAttemptID taskId, int partition) {
@@ -155,6 +158,7 @@
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
     this.mapOutputFile.setJobId(taskId.getJobID());
+    spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
   }
 
   ////////////////////////////////////////////

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Tue Nov 25 14:31:33 2008
@@ -14,5 +14,5 @@
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
-
+SPILLED_RECORDS.name=          Spilled Records
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Tue Nov 25 14:31:33 2008
@@ -80,7 +80,8 @@
     FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
     Path path = new Path(tmpDir, "data.in");
     IFile.Writer<Text, Text> writer = 
-      new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, codec);
+      new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+                                   codec, null);
     for(Pair p: vals) {
       writer.append(new Text(p.key), new Text(p.value));
     }
@@ -90,7 +91,7 @@
     RawKeyValueIterator rawItr = 
       Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
                    false, conf.getInt("io.sort.factor", 100), tmpDir, 
-                   new Text.Comparator(), new NullProgress());
+                   new Text.Comparator(), new NullProgress(),null,null);
     @SuppressWarnings("unchecked") // WritableComparators are not generic
     ReduceTask.ValuesIterator valItr = 
       new ReduceTask.ValuesIterator<Text,Text>(rawItr,

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java?rev=720627&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java Tue Nov 25 14:31:33 2008
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Writer;
+import java.io.BufferedWriter;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This is an wordcount application that tests the count of records
+ * got spilled to disk. It generates simple text input files. Then
+ * runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
+ * and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
+ * and 1 reduce) and verifies counters. Wordcount application reads the
+ * text input files, breaks each line into words and counts them. The output
+ * is a locally sorted list of words and the count of how often they occurred.
+ *
+ */
+public class TestSpilledRecordsCounter extends TestCase {
+
+  private void validateCounters(Counters counter, long spillRecCnt) {
+      // Check if the numer of Spilled Records is same as expected
+      assertEquals(counter.findCounter(Task.Counter.SPILLED_RECORDS).
+                     getCounter(), spillRecCnt);
+  }
+
+  private void createWordsFile(File inpFile) throws Exception {
+    Writer out = new BufferedWriter(new FileWriter(inpFile));
+    try {
+      // 500*4 unique words --- repeated 5 times => 5*2K words
+      int REPLICAS=5, NUMLINES=500, NUMWORDSPERLINE=4;
+
+      for (int i = 0; i < REPLICAS; i++) {
+        for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
+          out.write("word" + j + " word" + (j+1) + " word" + (j+2) + " word" + (j+3) + '\n');
+        }
+      }
+    } finally {
+      out.close();
+    }
+  }
+
+
+  /**
+   * The main driver for word count map/reduce program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the
+   *                     job tracker.
+   */
+  public void testSpillCounter() throws Exception {
+    JobConf conf = new JobConf(TestSpilledRecordsCounter.class);
+    conf.setJobName("wordcountSpilledRecordsCounter");
+
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(WordCount.MapClass.class);
+    conf.setCombinerClass(WordCount.Reduce.class);
+    conf.setReducerClass(WordCount.Reduce.class);
+
+    conf.setNumMapTasks(3);
+    conf.setNumReduceTasks(1);
+    conf.setInt("io.sort.mb", 1);
+    conf.setInt("io.sort.factor", 2);
+    conf.set("io.sort.record.percent", "0.05");
+    conf.set("io.sort.spill.percent", "0.80");
+
+
+    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+                                      File.separator + "tmp"))
+                               .toString().replace(' ', '+');
+    conf.set("test.build.data", TEST_ROOT_DIR);
+    String IN_DIR = TEST_ROOT_DIR + File.separator +
+                      "spilledRecords.countertest" +  File.separator +
+                      "genins" + File.separator;
+    String OUT_DIR = TEST_ROOT_DIR + File.separator +
+                      "spilledRecords.countertest" + File.separator;
+
+    FileSystem fs = FileSystem.get(conf);
+    Path testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
+    try {
+      if (fs.exists(testdir)) {
+        fs.delete(testdir, true);
+      }
+      if (!fs.mkdirs(testdir)) {
+        throw new IOException("Mkdirs failed to create " + testdir.toString());
+      }
+
+      Path wordsIns = new Path(testdir, "genins");
+      if (!fs.mkdirs(wordsIns)) {
+        throw new IOException("Mkdirs failed to create " + wordsIns.toString());
+      }
+
+      //create 3 input files each with 5*2k words
+      File inpFile = new File(IN_DIR + "input5_2k_1");
+      createWordsFile(inpFile);
+      inpFile = new File(IN_DIR + "input5_2k_2");
+      createWordsFile(inpFile);
+      inpFile = new File(IN_DIR + "input5_2k_3");
+      createWordsFile(inpFile);
+
+      FileInputFormat.setInputPaths(conf, IN_DIR);
+      Path outputPath1=new Path(OUT_DIR, "output5_2k_3");
+      FileOutputFormat.setOutputPath(conf, outputPath1);
+
+      RunningJob myJob = JobClient.runJob(conf);
+      Counters c1 = myJob.getCounters();
+      // 3maps & in each map, 4 first level spills --- So total 12.
+      // spilled records count:
+      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+      //           So total 8k+8k+2k=18k
+      // For 3 Maps, total = 3*18=54k
+      // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
+      //         So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
+      //         3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
+      //         So 0 records spilled to disk in 3rd level)
+      //         So total of 6k+4k=10k
+      // Total job counter will be 54k+10k = 64k
+      validateCounters(c1, 64000);
+
+      //create 4th input file each with 5*2k words and test with 4 maps
+      inpFile = new File(IN_DIR + "input5_2k_4");
+      createWordsFile(inpFile);
+      conf.setNumMapTasks(4);
+      Path outputPath2=new Path(OUT_DIR, "output5_2k_4");
+      FileOutputFormat.setOutputPath(conf, outputPath2);
+
+      myJob = JobClient.runJob(conf);
+      c1 = myJob.getCounters();
+      // 4maps & in each map 4 first level spills --- So total 16.
+      // spilled records count:
+      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+      //           So total 8k+8k+2k=18k
+      // For 3 Maps, total = 4*18=72k
+      // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
+      //         So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
+      //         3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
+      //         So 0 records spilled to disk in 3rd level)
+      //         So total of 8k+8k=16k
+      // Total job counter will be 72k+16k = 88k
+      validateCounters(c1, 88000);
+    } finally {
+      //clean up the input and output files
+      if (fs.exists(testdir)) {
+        fs.delete(testdir, true);
+      }
+    }
+  }
+}