You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/02/05 06:43:21 UTC

svn commit: r906820 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/map...

Author: cdouglas
Date: Fri Feb  5 05:43:19 2010
New Revision: 906820

URL: http://svn.apache.org/viewvc?rev=906820&view=rev
Log:
MAPREDUCE-64. Eliminate io.sort.record.percent from MapTask configuration.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapCollection.java
      - copied, changed from r906813, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapCollection.java
Removed:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapCollection.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
    hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Feb  5 05:43:19 2010
@@ -152,6 +152,9 @@
     MAPREDUCE-1367. LocalJobRunner should support parallel mapper execution.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-64. Eliminate io.sort.record.percent from MapTask configuration.
+    (cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java Fri Feb  5 05:43:19 2010
@@ -99,14 +99,12 @@
   public String getPrescription() {
     return 
     "* Use combiner to lower the map output size.\n" +
-      "* Increase map side sort buffer size (" + JobContext.IO_SORT_FACTOR + 
+      "* Increase map side sort buffer size (" + JobContext.IO_SORT_MB + 
       ":" + this._job.getJobConf().getInt(JobContext.IO_SORT_MB, 0) + ").\n" +
-      "* Increase index buffer size (" + JobContext.MAP_SORT_RECORD_PERCENT + 
-      ":" + this._job.getJobConf().getInt(JobContext.MAP_SORT_RECORD_PERCENT, 0)
-      + ") if number of Map Output Records are large. \n" +
+      ") if number of Map Output Records are large. \n" +
       "* Increase (" + JobContext.MAP_SORT_SPILL_PERCENT + ":" + 
       this._job.getJobConf().getInt(JobContext.MAP_SORT_SPILL_PERCENT, 0) + 
-      "), default 0.80 i.e. 80% of sort buffer size and index buffer size. \n";
+      "), default 0.80 i.e. 80% of sort buffer size. \n";
   }
 
   /* (non-Javadoc)

Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Fri Feb  5 05:43:19 2010
@@ -1334,17 +1334,16 @@
         <section>
           <title>Map Parameters</title>
 
-          <p>A record emitted from a map will be serialized into a buffer and
-          metadata will be stored into accounting buffers. As described in the
-          following options, when either the serialization buffer or the
-          metadata exceed a threshold, the contents of the buffers will be
-          sorted and written to disk in the background while the map continues
-          to output records. If either buffer fills completely while the spill
-          is in progress, the map thread will block. When the map is finished,
-          any remaining records are written to disk and all on-disk segments
-          are merged into a single file. Minimizing the number of spills to
-          disk can decrease map time, but a larger buffer also decreases the
-          memory available to the mapper.</p>
+          <p>A record emitted from a map and its metadata will be serialized
+          into a buffer. As described in the following options, when the record
+          data exceed a threshold, the contents of this buffer will be sorted
+          and written to disk in the background (a "spill") while the map
+          continues to output records. If the remainder of the buffer fills
+          during the spill, the map thread will block. When the map is
+          finished, any buffered records are written to disk and all on-disk
+          segments are merged into a single file. Minimizing the number of
+          spills to disk <em>can</em> decrease map time, but a larger buffer
+          also decreases the memory available to the mapper.</p>
 
           <table>
             <tr><th>Name</th><th>Type</th><th>Description</th></tr>
@@ -1352,26 +1351,10 @@
                 <td>The cumulative size of the serialization and accounting
                 buffers storing records emitted from the map, in megabytes.
                 </td></tr>
-            <tr><td>mapreduce.map.sort.record.percent</td><td>float</td>
-                <td>The ratio of serialization to accounting space can be
-                adjusted. Each serialized record requires 16 bytes of
-                accounting information in addition to its serialized size to
-                effect the sort. This percentage of space allocated from
-                <code>mapreduce.task.io.sort.mb</code> affects the 
-                probability of a spill to
-                disk being caused by either exhaustion of the serialization
-                buffer or the accounting space. Clearly, for a map outputting
-                small records, a higher value than the default will likely
-                decrease the number of spills to disk.</td></tr>
             <tr><td>mapreduce.map.sort.spill.percent</td><td>float</td>
                 <td>This is the threshold for the accounting and serialization
-                buffers. When this percentage of either buffer has filled,
-                their contents will be spilled to disk in the background. Let
-                <code>mapreduce.map.sort.record.percent</code> be <em>r</em>,
-                <code>mapreduce.task.io.sort.mb</code> be <em>x</em>, 
-                and this value be
-                <em>q</em>. The maximum number of records collected before the
-                collection thread will spill is <code>r * x * q * 2^16</code>.
+                buffer. When this percentage of the <code>io.sort.mb</code> has
+                filled, its contents will be spilled to disk in the background.
                 Note that a higher value may decrease the number of- or even
                 eliminate- merges, but will also increase the probability of
                 the map task getting blocked. The lowest average map times are
@@ -1381,10 +1364,10 @@
 
           <p>Other notes</p>
           <ul>
-            <li>If either spill threshold is exceeded while a spill is in
-            progress, collection will continue until the spill is finished.
-            For example, if <code>io.sort.buffer.spill.percent</code> is set
-            to 0.33, and the remainder of the buffer is filled while the spill
+            <li>If the spill threshold is exceeded while a spill is in
+            progress, collection will continue until the spill is finished. For
+            example, if <code>io.sort.buffer.spill.percent</code> is set to
+            0.33, and the remainder of the buffer is filled while the spill
             runs, the next spill will include all the collected records, or
             0.66 of the buffer, and will not generate additional spills. In
             other words, the thresholds are defining triggers, not

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Fri Feb  5 05:43:19 2010
@@ -76,22 +76,13 @@
 </property>
 
 <property>
-  <name>mapreduce.map.sort.record.percent</name>
-  <value>0.05</value>
-  <description>The percentage of mapreduce.task.io.sort.mb dedicated to 
-  tracking record boundaries. Let this value be r, 
-  mapreduce.task.io.sort.mb be x. The maximum number
-  of records collected before the collection thread must block is equal
-  to (r * x) / 4</description>
-</property>
-
-<property>
   <name>mapreduce.map.sort.spill.percent</name>
   <value>0.80</value>
-  <description>The soft limit in either the buffer or record collection
-  buffers. Once reached, a thread will begin to spill the contents to disk
-  in the background. Note that this does not imply any chunking of data to
-  the spill. A value less than 0.5 is not recommended.</description>
+  <description>The soft limit in the serialization buffer. Once reached, a
+  thread will begin to spill the contents to disk in the background. Note that
+  collection will not block if this threshold is exceeded while a spill is
+  already in progress, so spills may be larger than this threshold when it is
+  set to less than .5</description>
 </property>
 
 <property>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Feb  5 05:43:19 2010
@@ -25,6 +25,8 @@
 import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
@@ -705,67 +707,72 @@
     
   }
 
-  class MapOutputBuffer<K extends Object, V extends Object>
+  private class MapOutputBuffer<K extends Object, V extends Object>
       implements MapOutputCollector<K, V>, IndexedSortable {
-    private final int partitions;
-    private final JobConf job;
-    private final TaskReporter reporter;
-    private final Class<K> keyClass;
-    private final Class<V> valClass;
-    private final RawComparator<K> comparator;
-    private final SerializationFactory serializationFactory;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valSerializer;
-    private final CombinerRunner<K,V> combinerRunner;
-    private final CombineOutputCollector<K, V> combineCollector;
-    
+    final int partitions;
+    final JobConf job;
+    final TaskReporter reporter;
+    final Class<K> keyClass;
+    final Class<V> valClass;
+    final RawComparator<K> comparator;
+    final SerializationFactory serializationFactory;
+    final Serializer<K> keySerializer;
+    final Serializer<V> valSerializer;
+    final CombinerRunner<K,V> combinerRunner;
+    final CombineOutputCollector<K, V> combineCollector;
+
     // Compression for map-outputs
-    private CompressionCodec codec = null;
+    final CompressionCodec codec;
 
     // k/v accounting
-    private volatile int kvstart = 0;  // marks beginning of spill
-    private volatile int kvend = 0;    // marks beginning of collectable
-    private int kvindex = 0;           // marks end of collected
-    private final int[] kvoffsets;     // indices into kvindices
-    private final int[] kvindices;     // partition, k/v offsets into kvbuffer
-    private volatile int bufstart = 0; // marks beginning of spill
-    private volatile int bufend = 0;   // marks beginning of collectable
-    private volatile int bufvoid = 0;  // marks the point where we should stop
-                                       // reading at the end of the buffer
-    private int bufindex = 0;          // marks end of collected
-    private int bufmark = 0;           // marks end of record
-    private byte[] kvbuffer;           // main output buffer
-    private static final int PARTITION = 0; // partition offset in acct
-    private static final int KEYSTART = 1;  // key offset in acct
-    private static final int VALSTART = 2;  // val offset in acct
-    private static final int ACCTSIZE = 3;  // total #fields in acct
-    private static final int RECSIZE =
-                       (ACCTSIZE + 1) * 4;  // acct bytes per record
+    final IntBuffer kvmeta; // metadata overlay on backing store
+    int kvstart;            // marks origin of spill metadata
+    int kvend;              // marks end of spill metadata
+    int kvindex;            // marks end of fully serialized records
+
+    int equator;            // marks origin of meta/serialization
+    int bufstart;           // marks beginning of spill
+    int bufend;             // marks beginning of collectable
+    int bufmark;            // marks end of record
+    int bufindex;           // marks end of collected
+    int bufvoid;            // marks the point where we should stop
+                            // reading at the end of the buffer
+
+    byte[] kvbuffer;        // main output buffer
+    private final byte[] b0 = new byte[0];
+
+    private static final int INDEX = 0;            // index offset in acct
+    private static final int VALSTART = 1;         // val offset in acct
+    private static final int KEYSTART = 2;         // key offset in acct
+    private static final int PARTITION = 3;        // partition offset in acct
+    private static final int NMETA = 4;            // num meta ints
+    private static final int METASIZE = NMETA * 4; // size in bytes
 
     // spill accounting
-    private volatile int numSpills = 0;
-    private volatile Throwable sortSpillException = null;
-    private final int softRecordLimit;
-    private final int softBufferLimit;
-    private int recordRemaining;
-    private int bufferRemaining;
-    private final int minSpillsForCombine;
-    private final IndexedSorter sorter;
-    private final ReentrantLock spillLock = new ReentrantLock();
-    private final Condition spillDone = spillLock.newCondition();
-    private final Condition spillReady = spillLock.newCondition();
-    private final BlockingBuffer bb = new BlockingBuffer();
-    private volatile boolean spillThreadRunning = false;
-    private final SpillThread spillThread = new SpillThread();
-
-    private final FileSystem localFs;
-    private final FileSystem rfs;
-   
-    private final Counters.Counter mapOutputByteCounter;
-    private final Counters.Counter mapOutputRecordCounter;
-    private final Counters.Counter combineOutputCounter;
-    
-    private ArrayList<SpillRecord> indexCacheList;
+    final int maxRec;
+    final int softLimit;
+    boolean spillInProgress;;
+    int bufferRemaining;
+    volatile Throwable sortSpillException = null;
+
+    int numSpills = 0;
+    final int minSpillsForCombine;
+    final IndexedSorter sorter;
+    final ReentrantLock spillLock = new ReentrantLock();
+    final Condition spillDone = spillLock.newCondition();
+    final Condition spillReady = spillLock.newCondition();
+    final BlockingBuffer bb = new BlockingBuffer();
+    volatile boolean spillThreadRunning = false;
+    final SpillThread spillThread = new SpillThread();
+
+    final FileSystem rfs;
+
+    // Counters
+    final Counters.Counter mapOutputByteCounter;
+    final Counters.Counter mapOutputRecordCounter;
+
+    final ArrayList<SpillRecord> indexCacheList =
+      new ArrayList<SpillRecord>();
     private int totalIndexCacheMemory;
     private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
 
@@ -775,44 +782,43 @@
                            ) throws IOException, ClassNotFoundException {
       this.job = job;
       this.reporter = reporter;
-      localFs = FileSystem.getLocal(job);
       partitions = job.getNumReduceTasks();
-       
-      rfs = ((LocalFileSystem)localFs).getRaw();
+      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
 
-      indexCacheList = new ArrayList<SpillRecord>();
-      
       //sanity checks
-      final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,(float)0.8);
-      final float recper = job.getFloat(JobContext.MAP_SORT_RECORD_PERCENT,(float)0.05);
+      final float spillper =
+        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
       final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
-      if (spillper > (float)1.0 || spillper < (float)0.0) {
-        throw new IOException("Invalid \"mapreduce.map.sort.spill.percent\": " + spillper);
-      }
-      if (recper > (float)1.0 || recper < (float)0.01) {
-        throw new IOException("Invalid \"mapreduce.map.sort.record.percent\": " + recper);
+      if (spillper > (float)1.0 || spillper <= (float)0.0) {
+        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
+            "\": " + spillper);
       }
       if ((sortmb & 0x7FF) != sortmb) {
-        throw new IOException("Invalid " + JobContext.IO_SORT_MB + ": " + sortmb);
+        throw new IOException(
+            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
       }
       sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
             QuickSort.class, IndexedSorter.class), job);
-      LOG.info(JobContext.IO_SORT_MB + " = " + sortmb);
       // buffers and accounting
       int maxMemUsage = sortmb << 20;
-      int recordCapacity = (int)(maxMemUsage * recper);
-      recordCapacity -= recordCapacity % RECSIZE;
-      kvbuffer = new byte[maxMemUsage - recordCapacity];
+      maxMemUsage -= maxMemUsage % METASIZE;
+      kvbuffer = new byte[maxMemUsage];
       bufvoid = kvbuffer.length;
-      recordCapacity /= RECSIZE;
-      kvoffsets = new int[recordCapacity];
-      kvindices = new int[recordCapacity * ACCTSIZE];
-      softBufferLimit = (int)(kvbuffer.length * spillper);
-      softRecordLimit = (int)(kvoffsets.length * spillper);
-      recordRemaining = softRecordLimit;
-      bufferRemaining = softBufferLimit;
-      LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
-      LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
+      kvmeta = ByteBuffer.wrap(kvbuffer).asIntBuffer();
+      setEquator(0);
+      bufstart = bufend = bufindex = equator;
+      kvstart = kvend = kvindex;
+
+      maxRec = kvmeta.capacity() / NMETA;
+      softLimit = (int)(kvbuffer.length * spillper);
+      bufferRemaining = softLimit;
+      if (LOG.isInfoEnabled()) {
+        LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
+        LOG.info("soft limit at " + softLimit);
+        LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
+        LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
+      }
+
       // k/v serialization
       comparator = job.getOutputKeyComparator();
       keyClass = (Class<K>)job.getMapOutputKeyClass();
@@ -822,27 +828,35 @@
       keySerializer.open(bb);
       valSerializer = serializationFactory.getSerializer(valClass);
       valSerializer.open(bb);
-      // counters
+
+      // output counters
       mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
-      mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
-      Counters.Counter combineInputCounter = 
-        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
-      combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+      mapOutputRecordCounter =
+        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+
       // compression
       if (job.getCompressMapOutput()) {
         Class<? extends CompressionCodec> codecClass =
           job.getMapOutputCompressorClass(DefaultCodec.class);
         codec = ReflectionUtils.newInstance(codecClass, job);
+      } else {
+        codec = null;
       }
+
       // combiner
+      final Counters.Counter combineInputCounter =
+        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
       combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                              combineInputCounter,
                                              reporter, null);
       if (combinerRunner != null) {
+        final Counters.Counter combineOutputCounter =
+          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
         combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
       } else {
         combineCollector = null;
       }
+      spillInProgress = false;
       minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
       spillThread.setDaemon(true);
       spillThread.setName("SpillThread");
@@ -853,18 +867,22 @@
           spillDone.await();
         }
       } catch (InterruptedException e) {
-        throw (IOException)new IOException("Spill thread failed to initialize"
-            ).initCause(sortSpillException);
+        throw new IOException("Spill thread failed to initialize", e);
       } finally {
         spillLock.unlock();
       }
       if (sortSpillException != null) {
-        throw (IOException)new IOException("Spill thread failed to initialize"
-            ).initCause(sortSpillException);
+        throw new IOException("Spill thread failed to initialize",
+            sortSpillException);
       }
     }
 
-    public synchronized void collect(K key, V value, int partition
+    /**
+     * Serialize the key, value to intermediate storage.
+     * When this method returns, kvindex must refer to sufficient unused
+     * storage to store one METADATA.
+     */
+    public synchronized void collect(K key, V value, final int partition
                                      ) throws IOException {
       reporter.progress();
       if (key.getClass() != keyClass) {
@@ -877,50 +895,66 @@
                               + valClass.getName() + ", recieved "
                               + value.getClass().getName());
       }
-      final int kvnext = (kvindex + 1) % kvoffsets.length;
-      if (--recordRemaining <= 0) {
-        // Possible for check to remain < zero, if soft limit remains
-        // in force but unsatisfiable because spill is in progress
+      if (partition < 0 || partition >= partitions) {
+        throw new IOException("Illegal partition for " + key + " (" +
+            partition + ")");
+      }
+      checkSpillException();
+      bufferRemaining -= METASIZE;
+      if (bufferRemaining <= 0) {
+        // start spill if the thread is not running and the soft limit has been
+        // reached
         spillLock.lock();
         try {
-          boolean kvfull;
           do {
-            if (sortSpillException != null) {
-              throw (IOException)new IOException("Spill failed"
-                  ).initCause(sortSpillException);
-            }
-            // sufficient acct space
-            kvfull = kvnext == kvstart;
-            final boolean kvsoftlimit = ((kvnext > kvend)
-                ? kvnext - kvend > softRecordLimit
-                : kvend - kvnext <= kvoffsets.length - softRecordLimit);
-            if (kvstart == kvend && kvsoftlimit) {
-              LOG.info("Spilling map output: record full = " + kvfull);
-              startSpill();
-            }
-            if (kvfull) {
-              try {
-                while (kvstart != kvend) {
-                  reporter.progress();
-                  spillDone.await();
-                }
-              } catch (InterruptedException e) {
-                throw (IOException)new IOException(
-                    "Collector interrupted while waiting for the writer"
-                    ).initCause(e);
+            if (!spillInProgress) {
+              final int kvbidx = 4 * kvindex;
+              final int kvbend = 4 * kvend;
+              // serialized, unspilled bytes always lie between kvindex and
+              // bufindex, crossing the equator. Note that any void space
+              // created by a reset must be included in "used" bytes
+              final int bUsed = distanceTo(kvbidx, bufindex);
+              final boolean bufsoftlimit = bUsed >= softLimit;
+              if ((kvbend + METASIZE) % kvbuffer.length !=
+                  equator - (equator % METASIZE)) {
+                // spill finished, reclaim space
+                resetSpill();
+                bufferRemaining = Math.min(
+                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
+                    softLimit - bUsed) - METASIZE;
+                continue;
+              } else if (bufsoftlimit && kvindex != kvend) {
+                // spill records, if any collected; check latter, as it may
+                // be possible for metadata alignment to hit spill pcnt
+                startSpill();
+                final int avgRec = (int)
+                  (mapOutputByteCounter.getCounter() /
+                  mapOutputRecordCounter.getCounter());
+                // leave at least half the split buffer for serialization data
+                // ensure that kvindex >= bufindex
+                final int distkvi = distanceTo(bufindex, kvbidx);
+                final int newPos = (bufindex +
+                  Math.max(2 * METASIZE - 1,
+                          Math.min(distkvi / 2,
+                                   distkvi / (METASIZE + avgRec) * METASIZE)))
+                  % kvbuffer.length;
+                setEquator(newPos);
+                bufmark = bufindex = newPos;
+                final int serBound = 4 * kvend;
+                // bytes remaining before the lock must be held and limits
+                // checked is the minimum of three arcs: the metadata space, the
+                // serialization space, and the soft limit
+                bufferRemaining = Math.min(
+                    // metadata max
+                    distanceTo(bufend, newPos),
+                    Math.min(
+                      // serialization max
+                      distanceTo(newPos, serBound),
+                      // soft limit
+                      softLimit)) - 2 * METASIZE;
               }
             }
-          } while (kvfull);
-          final int softOff = kvend + softRecordLimit;
-          recordRemaining = Math.min(
-              // out of acct space
-              (kvnext < kvstart
-                 ? kvstart - kvnext
-                 : kvoffsets.length - kvnext + kvstart),
-              // soft limit
-              (kvend < kvnext
-                 ? softOff - kvnext
-                 : kvnext + (softOff - kvoffsets.length)));
+          } while (false);
         } finally {
           spillLock.unlock();
         }
@@ -931,39 +965,110 @@
         int keystart = bufindex;
         keySerializer.serialize(key);
         if (bufindex < keystart) {
-          // wrapped the key; reset required
-          bb.reset();
+          // wrapped the key; must make contiguous
+          bb.shiftBufferedKey();
           keystart = 0;
         }
         // serialize value bytes into buffer
         final int valstart = bufindex;
         valSerializer.serialize(value);
-        int valend = bb.markRecord();
+        // It's possible for records to have zero length, i.e. the serializer
+        // will perform no writes. To ensure that the boundary conditions are
+        // checked and that the kvindex invariant is maintained, perform a
+        // zero-length write into the buffer. The logic monitoring this could be
+        // moved into collect, but this is cleaner and inexpensive. For now, it
+        // is acceptable.
+        bb.write(b0, 0, 0);
 
-        if (partition < 0 || partition >= partitions) {
-          throw new IOException("Illegal partition for " + key + " (" +
-              partition + ")");
-        }
+        // the record must be marked after the preceding write, as the metadata
+        // for this record are not yet written
+        int valend = bb.markRecord();
 
         mapOutputRecordCounter.increment(1);
-        mapOutputByteCounter.increment(valend >= keystart
-            ? valend - keystart
-            : (bufvoid - keystart) + valend);
-
-        // update accounting info
-        int ind = kvindex * ACCTSIZE;
-        kvoffsets[kvindex] = ind;
-        kvindices[ind + PARTITION] = partition;
-        kvindices[ind + KEYSTART] = keystart;
-        kvindices[ind + VALSTART] = valstart;
-        kvindex = kvnext;
+        mapOutputByteCounter.increment(
+            distanceTo(keystart, valend, bufvoid));
+
+        // write accounting info
+        kvmeta.put(kvindex + INDEX, kvindex);
+        kvmeta.put(kvindex + PARTITION, partition);
+        kvmeta.put(kvindex + KEYSTART, keystart);
+        kvmeta.put(kvindex + VALSTART, valstart);
+        // advance kvindex
+        kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
       } catch (MapBufferTooSmallException e) {
         LOG.info("Record too large for in-memory buffer: " + e.getMessage());
         spillSingleRecord(key, value, partition);
         mapOutputRecordCounter.increment(1);
         return;
       }
+    }
+
+    /**
+     * Set the point from which meta and serialization data expand. The meta
+     * indices are aligned with the buffer, so metadata never spans the ends of
+     * the circular buffer.
+     */
+    private void setEquator(int pos) {
+      equator = pos;
+      // set index prior to first entry, aligned at meta boundary
+      final int aligned = pos - (pos % METASIZE);
+      kvindex =
+        ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
+            "(" + (kvindex * 4) + ")");
+      }
+    }
+
+    /**
+     * The spill is complete, so set the buffer and meta indices to be equal to
+     * the new equator to free space for continuing collection. Note that when
+     * kvindex == kvend == kvstart, the buffer is empty.
+     */
+    private void resetSpill() {
+      final int e = equator;
+      bufstart = bufend = e;
+      final int aligned = e - (e % METASIZE);
+      // set start/end to point to first meta record
+      kvstart = kvend =
+        ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
+          (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
+      }
+    }
+
+    /**
+     * Compute the distance in bytes between two indices in the serialization
+     * buffer.
+     * @see #distanceTo(int,int,int)
+     */
+    final int distanceTo(final int i, final int j) {
+      return distanceTo(i, j, kvbuffer.length);
+    }
+
+    /**
+     * Compute the distance between two indices in the circular buffer given the
+     * max distance.
+     */
+    int distanceTo(final int i, final int j, final int mod) {
+      return i <= j
+        ? j - i
+        : mod - i + j;
+    }
 
+    /**
+     * For the given meta position, return the dereferenced position in the
+     * integer array. Each meta block contains several integers describing
+     * record data in its serialized form, but the INDEX is not necessarily
+     * related to the proximate metadata. The index value at the referenced int
+     * position is the start offset of the associated metadata block. So the
+     * metadata INDEX at metapos may point to the metadata described by the
+     * metadata block at metapos + k, which contains information about that
+     * serialized record.
+     */
+    int offsetFor(int metapos) {
+      return kvmeta.get(metapos * NMETA + INDEX);
     }
 
     /**
@@ -971,32 +1076,34 @@
      * Compare by partition, then by key.
      * @see IndexedSortable#compare
      */
-    public int compare(int i, int j) {
-      final int ii = kvoffsets[i % kvoffsets.length];
-      final int ij = kvoffsets[j % kvoffsets.length];
+    public int compare(final int mi, final int mj) {
+      final int kvi = offsetFor(mi % maxRec);
+      final int kvj = offsetFor(mj % maxRec);
+      final int kvip = kvmeta.get(kvi + PARTITION);
+      final int kvjp = kvmeta.get(kvj + PARTITION);
       // sort by partition
-      if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
-        return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
+      if (kvip != kvjp) {
+        return kvip - kvjp;
       }
       // sort by key
       return comparator.compare(kvbuffer,
-          kvindices[ii + KEYSTART],
-          kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
+          kvmeta.get(kvi + KEYSTART),
+          kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
           kvbuffer,
-          kvindices[ij + KEYSTART],
-          kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
+          kvmeta.get(kvj + KEYSTART),
+          kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
     }
 
     /**
      * Swap logical indices st i, j MOD offset capacity.
      * @see IndexedSortable#swap
      */
-    public void swap(int i, int j) {
-      i %= kvoffsets.length;
-      j %= kvoffsets.length;
-      int tmp = kvoffsets[i];
-      kvoffsets[i] = kvoffsets[j];
-      kvoffsets[j] = tmp;
+    public void swap(final int mi, final int mj) {
+      final int kvi = (mi % maxRec) * NMETA + INDEX;
+      final int kvj = (mj % maxRec) * NMETA + INDEX;
+      int tmp = kvmeta.get(kvi);
+      kvmeta.put(kvi, kvmeta.get(kvj));
+      kvmeta.put(kvj, tmp);
     }
 
     /**
@@ -1005,11 +1112,7 @@
     protected class BlockingBuffer extends DataOutputStream {
 
       public BlockingBuffer() {
-        this(new Buffer());
-      }
-
-      private BlockingBuffer(OutputStream out) {
-        super(out);
+        super(new Buffer());
       }
 
       /**
@@ -1028,23 +1131,24 @@
        * If the key is to be passed to a RawComparator, then it must be
        * contiguous in the buffer. This recopies the data in the buffer back
        * into itself, but starting at the beginning of the buffer. Note that
-       * reset() should <b>only</b> be called immediately after detecting
+       * this method should <b>only</b> be called immediately after detecting
        * this condition. To call it at any other time is undefined and would
        * likely result in data loss or corruption.
        * @see #markRecord()
        */
-      protected void reset() throws IOException {
-        // spillLock unnecessary; If spill wraps, then
-        // bufindex < bufstart < bufend so contention is impossible
-        // a stale value for bufstart does not affect correctness, since
-        // we can only get false negatives that force the more
-        // conservative path
+      protected void shiftBufferedKey() throws IOException {
+        // spillLock unnecessary; both kvend and kvindex are current
         int headbytelen = bufvoid - bufmark;
         bufvoid = bufmark;
-        if (bufindex + headbytelen < bufstart) {
+        final int kvbidx = 4 * kvindex;
+        final int kvbend = 4 * kvend;
+        final int avail =
+          Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
+        if (bufindex + headbytelen < avail) {
           System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
           System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
           bufindex += headbytelen;
+          bufferRemaining -= kvbuffer.length - bufvoid;
         } else {
           byte[] keytmp = new byte[bufindex];
           System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
@@ -1075,87 +1179,91 @@
       @Override
       public void write(byte b[], int off, int len)
           throws IOException {
-        boolean buffull = false;
-        boolean wrap = false;
+        // must always verify the invariant that at least METASIZE bytes are
+        // available beyond kvindex, even when len == 0
         bufferRemaining -= len;
         if (bufferRemaining <= 0) {
-          // writing these bytes could exhaust available buffer space
-          // check if spill or blocking is necessary
+          // writing these bytes could exhaust available buffer space or fill
+          // the buffer to soft limit. check if spill or blocking are necessary
+          boolean blockwrite = false;
           spillLock.lock();
           try {
             do {
-              if (sortSpillException != null) {
-                throw (IOException)new IOException("Spill failed"
-                    ).initCause(sortSpillException);
-              }
-
-              // sufficient buffer space?
-              if (bufstart <= bufend && bufend <= bufindex) {
-                buffull = bufindex + len > bufvoid;
-                wrap = (bufvoid - bufindex) + bufstart > len;
-              } else {
-                // bufindex <= bufstart <= bufend
-                // bufend <= bufindex <= bufstart
-                wrap = false;
-                buffull = bufindex + len > bufstart;
-              }
+              checkSpillException();
 
-              if (kvstart == kvend) {
-                // spill thread not running
-                if (kvend != kvindex) {
-                  // we have records we can spill
-                  final boolean bufsoftlimit = (bufindex > bufend)
-                    ? bufindex - bufend > softBufferLimit
-                    : bufend - bufindex < bufvoid - softBufferLimit;
-                  if (bufsoftlimit || (buffull && !wrap)) {
-                    LOG.info("Spilling map output: buffer full= " + (buffull && !wrap));
+              final int kvbidx = 4 * kvindex;
+              final int kvbend = 4 * kvend;
+              // ser distance to key index
+              final int distkvi = distanceTo(bufindex, kvbidx);
+              // ser distance to spill end index
+              final int distkve = distanceTo(bufindex, kvbend);
+
+              // if kvindex is closer than kvend, then a spill is neither in
+              // progress nor complete and reset since the lock was held. The
+              // write should block only if there is insufficient space to
+              // complete the current write, write the metadata for this record,
+              // and write the metadata for the next record. If kvend is closer,
+              // then the write should block if there is too little space for
+              // either the metadata or the current write. Note that collect
+              // ensures its metadata requirement with a zero-length write
+              blockwrite = distkvi <= distkve
+                ? distkvi <= len + 2 * METASIZE
+                : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
+
+              if (!spillInProgress) {
+                if (blockwrite) {
+                  if ((kvbend + METASIZE) % kvbuffer.length !=
+                      equator - (equator % METASIZE)) {
+                    // spill finished, reclaim space
+                    // need to use meta exclusively; zero-len rec & 100% spill
+                    // pcnt would fail
+                    resetSpill(); // resetSpill doesn't move bufindex, kvindex
+                    bufferRemaining = Math.min(
+                        distkvi - 2 * METASIZE,
+                        softLimit - distanceTo(kvbidx, bufindex)) - len;
+                    continue;
+                  }
+                  // we have records we can spill; only spill if blocked
+                  if (kvindex != kvend) {
                     startSpill();
+                    // Blocked on this write, waiting for the spill just
+                    // initiated to finish. Instead of repositioning the marker
+                    // and copying the partial record, we set the record start
+                    // to be the new equator
+                    setEquator(bufmark);
+                  } else {
+                    // We have no buffered records, and this record is too large
+                    // to write into kvbuffer. We must spill it directly from
+                    // collect
+                    final int size = distanceTo(bufstart, bufindex) + len;
+                    setEquator(0);
+                    bufstart = bufend = bufindex = equator;
+                    kvstart = kvend = kvindex;
+                    bufvoid = kvbuffer.length;
+                    throw new MapBufferTooSmallException(size + " bytes");
                   }
-                } else if (buffull && !wrap) {
-                  // We have no buffered records, and this record is too large
-                  // to write into kvbuffer. We must spill it directly from
-                  // collect
-                  final int size = ((bufend <= bufindex)
-                    ? bufindex - bufend
-                    : (bufvoid - bufend) + bufindex) + len;
-                  bufstart = bufend = bufindex = bufmark = 0;
-                  kvstart = kvend = kvindex = 0;
-                  bufvoid = kvbuffer.length;
-                  throw new MapBufferTooSmallException(size + " bytes");
                 }
               }
 
-              if (buffull && !wrap) {
+              if (blockwrite) {
+                // wait for spill
                 try {
-                  while (kvstart != kvend) {
+                  while (spillInProgress) {
                     reporter.progress();
                     spillDone.await();
                   }
                 } catch (InterruptedException e) {
-                    throw (IOException)new IOException(
-                        "Buffer interrupted while waiting for the writer"
-                        ).initCause(e);
+                    throw new IOException(
+                        "Buffer interrupted while waiting for the writer", e);
                 }
               }
-            } while (buffull && !wrap);
-            final int softOff = bufend + softBufferLimit;
-            bufferRemaining = Math.min(
-                // out of buffer space
-                (bufindex < bufstart
-                   ? bufstart - bufindex
-                   : kvbuffer.length - bufindex + bufstart),
-                // soft limit
-                (bufend < bufindex
-                   ? softOff - bufindex
-                   : bufindex + (softOff - kvbuffer.length)));
+            } while (blockwrite);
           } finally {
             spillLock.unlock();
           }
-        } else {
-          buffull = bufindex + len > bufvoid;
         }
         // here, we know that we have sufficient space to write
-        if (buffull) {
+        if (bufindex + len > bufvoid) {
           final int gaplen = bufvoid - bufindex;
           System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
           len -= gaplen;
@@ -1164,7 +1272,6 @@
         }
         System.arraycopy(b, off, kvbuffer, bufindex, len);
         bufindex += len;
-        bufferRemaining -= len;
       }
     }
 
@@ -1173,23 +1280,34 @@
       LOG.info("Starting flush of map output");
       spillLock.lock();
       try {
-        while (kvstart != kvend) {
+        while (spillInProgress) {
           reporter.progress();
           spillDone.await();
         }
-        if (sortSpillException != null) {
-          throw (IOException)new IOException("Spill failed"
-              ).initCause(sortSpillException);
+        checkSpillException();
+
+        final int kvbend = 4 * kvend;
+        if ((kvbend + METASIZE) % kvbuffer.length !=
+            equator - (equator % METASIZE)) {
+          // spill finished
+          resetSpill();
         }
-        if (kvend != kvindex) {
-          kvend = kvindex;
+        if (kvindex != kvend) {
+          kvend = (kvindex + NMETA) % kvmeta.capacity();
           bufend = bufmark;
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Spilling map output");
+            LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+                     "; bufvoid = " + bufvoid);
+            LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+                     "); kvend = " + kvend + "(" + (kvend * 4) +
+                     "); length = " + (distanceTo(kvend, kvstart,
+                           kvmeta.capacity()) + 1) + "/" + maxRec);
+          }
           sortAndSpill();
         }
       } catch (InterruptedException e) {
-        throw (IOException)new IOException(
-            "Buffer interrupted while waiting for the writer"
-            ).initCause(e);
+        throw new IOException("Interrupted while waiting for the writer", e);
       } finally {
         spillLock.unlock();
       }
@@ -1204,8 +1322,7 @@
         spillThread.interrupt();
         spillThread.join();
       } catch (InterruptedException e) {
-        throw (IOException)new IOException("Spill failed"
-            ).initCause(e);
+        throw new IOException("Spill failed", e);
       }
       // release sort buffer before the merge
       kvbuffer = null;
@@ -1223,26 +1340,22 @@
         try {
           while (true) {
             spillDone.signal();
-            while (kvstart == kvend) {
+            while (!spillInProgress) {
               spillReady.await();
             }
             try {
               spillLock.unlock();
               sortAndSpill();
-            } catch (Exception e) {
-              sortSpillException = e;
             } catch (Throwable t) {
               sortSpillException = t;
-              String logMsg = "Task " + getTaskID() + " failed : " 
-                              + StringUtils.stringifyException(t);
-              reportFatalError(getTaskID(), t, logMsg);
             } finally {
               spillLock.lock();
-              if (bufend < bufindex && bufindex < bufstart) {
+              if (bufend < bufstart) {
                 bufvoid = kvbuffer.length;
               }
               kvstart = kvend;
               bufstart = bufend;
+              spillInProgress = false;
             }
           }
         } catch (InterruptedException e) {
@@ -1254,13 +1367,32 @@
       }
     }
 
+    private void checkSpillException() throws IOException {
+      final Throwable lspillException = sortSpillException;
+      if (lspillException != null) {
+        if (lspillException instanceof Error) {
+          final String logMsg = "Task " + getTaskID() + " failed : " +
+            StringUtils.stringifyException(lspillException);
+          reportFatalError(getTaskID(), lspillException, logMsg);
+        }
+        throw new IOException("Spill failed", lspillException);
+      }
+    }
+
     private void startSpill() {
-      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-               "; bufvoid = " + bufvoid);
-      LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
-               "; length = " + kvoffsets.length);
-      kvend = kvindex;
+      assert !spillInProgress;
+      kvend = (kvindex + NMETA) % kvmeta.capacity();
       bufend = bufmark;
+      spillInProgress = true;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Spilling map output");
+        LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+                 "; bufvoid = " + bufvoid);
+        LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+                 "); kvend = " + kvend + "(" + (kvend * 4) +
+                 "); length = " + (distanceTo(kvend, kvstart,
+                       kvmeta.capacity()) + 1) + "/" + maxRec);
+      }
       spillReady.signal();
     }
 
@@ -1268,7 +1400,7 @@
                                        InterruptedException {
       //approximate the length of the output file to be the length of the
       //buffer + header lengths for the partitions
-      long size = (bufend >= bufstart
+      final long size = (bufend >= bufstart
           ? bufend - bufstart
           : (bufvoid - bufend) + bufstart) +
                   partitions * APPROX_HEADER_LENGTH;
@@ -1280,13 +1412,15 @@
             mapOutputFile.getSpillFileForWrite(numSpills, size);
         out = rfs.create(filename);
 
-        final int endPosition = (kvend > kvstart)
-          ? kvend
-          : kvoffsets.length + kvend;
-        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
-        int spindex = kvstart;
-        IndexRecord rec = new IndexRecord();
-        InMemValBytes value = new InMemValBytes();
+        final int mstart = kvend / NMETA;
+        final int mend = 1 + // kvend is a valid record
+          (kvstart >= kvend
+          ? kvstart
+          : kvmeta.capacity() + kvstart) / NMETA;
+        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
+        int spindex = mstart;
+        final IndexRecord rec = new IndexRecord();
+        final InMemValBytes value = new InMemValBytes();
         for (int i = 0; i < partitions; ++i) {
           IFile.Writer<K, V> writer = null;
           try {
@@ -1296,22 +1430,21 @@
             if (combinerRunner == null) {
               // spill directly
               DataInputBuffer key = new DataInputBuffer();
-              while (spindex < endPosition &&
-                  kvindices[kvoffsets[spindex % kvoffsets.length]
-                            + PARTITION] == i) {
-                final int kvoff = kvoffsets[spindex % kvoffsets.length];
+              while (spindex < mend &&
+                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
+                final int kvoff = offsetFor(spindex % maxRec);
+                key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
+                          (kvmeta.get(kvoff + VALSTART) -
+                           kvmeta.get(kvoff + KEYSTART)));
                 getVBytesForOffset(kvoff, value);
-                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
-                          (kvindices[kvoff + VALSTART] - 
-                           kvindices[kvoff + KEYSTART]));
                 writer.append(key, value);
                 ++spindex;
               }
             } else {
               int spstart = spindex;
-              while (spindex < endPosition &&
-                  kvindices[kvoffsets[spindex % kvoffsets.length]
-                            + PARTITION] == i) {
+              while (spindex < mend &&
+                  kvmeta.get(offsetFor(spindex % maxRec)
+                            + PARTITION) == i) {
                 ++spindex;
               }
               // Note: we would like to avoid the combiner if we've fewer
@@ -1372,7 +1505,7 @@
         final Path filename =
             mapOutputFile.getSpillFileForWrite(numSpills, size);
         out = rfs.create(filename);
-        
+
         // we don't run the combiner for a single record
         IndexRecord rec = new IndexRecord();
         for (int i = 0; i < partitions; ++i) {
@@ -1426,14 +1559,17 @@
      * deserialized value bytes. Should only be called during a spill.
      */
     private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
-      final int nextindex = (kvoff / ACCTSIZE ==
-                            (kvend - 1 + kvoffsets.length) % kvoffsets.length)
+      // get the keystart for the next serialized value to be the end
+      // of this value. If this is the last value in the buffer, use bufend
+      final int nextindex = kvoff == kvend
         ? bufend
-        : kvindices[(kvoff + ACCTSIZE + KEYSTART) % kvindices.length];
-      int vallen = (nextindex >= kvindices[kvoff + VALSTART])
-        ? nextindex - kvindices[kvoff + VALSTART]
-        : (bufvoid - kvindices[kvoff + VALSTART]) + nextindex;
-      vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
+        : kvmeta.get(
+            (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
+      // calculate the length of the value
+      int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
+        ? nextindex - kvmeta.get(kvoff + VALSTART)
+        : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
+      vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
     }
 
     /**
@@ -1443,12 +1579,12 @@
       private byte[] buffer;
       private int start;
       private int length;
-            
+
       public void reset(byte[] buffer, int start, int length) {
         this.buffer = buffer;
         this.start = start;
         this.length = length;
-        
+
         if (start + length > bufvoid) {
           this.buffer = new byte[this.length];
           final int taillen = bufvoid - start;
@@ -1456,7 +1592,7 @@
           System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
           this.start = 0;
         }
-        
+
         super.reset(this.buffer, this.start, this.length);
       }
     }
@@ -1474,13 +1610,13 @@
         return ++current < end;
       }
       public DataInputBuffer getKey() throws IOException {
-        final int kvoff = kvoffsets[current % kvoffsets.length];
-        keybuf.reset(kvbuffer, kvindices[kvoff + KEYSTART],
-                     kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]);
+        final int kvoff = offsetFor(current % maxRec);
+        keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
+            kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
         return keybuf;
       }
       public DataInputBuffer getValue() throws IOException {
-        getVBytesForOffset(kvoffsets[current % kvoffsets.length], vbytes);
+        getVBytesForOffset(offsetFor(current % maxRec), vbytes);
         return vbytes;
       }
       public Progress getProgress() {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Feb  5 05:43:19 2010
@@ -146,8 +146,6 @@
   public static final String TASK_LOG_RETAIN_HOURS = 
     "mapred.task.userlog.retain.hours";
   
-  public static final String MAP_SORT_RECORD_PERCENT = 
-    "mapreduce.map.sort.record.percent";
   public static final String MAP_SORT_SPILL_PERCENT =
     "mapreduce.map.sort.spill.percent";
   public static final String MAP_INPUT_FILE = "mapreduce.map.input.file";

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Fri Feb  5 05:43:19 2010
@@ -293,8 +293,6 @@
       new String[] {JobContext.TASK_LOG_RETAIN_HOURS});
     Configuration.addDeprecation("mapred.task.profile.params", 
       new String[] {JobContext.TASK_PROFILE_PARAMS});
-    Configuration.addDeprecation("io.sort.record.percent", 
-      new String[] {JobContext.MAP_SORT_RECORD_PERCENT});
     Configuration.addDeprecation("io.sort.spill.percent", 
       new String[] {JobContext.MAP_SORT_SPILL_PERCENT});
     Configuration.addDeprecation("map.input.file", 

Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Fri Feb  5 05:43:19 2010
@@ -197,13 +197,31 @@
        <Bug pattern="DLS_DEAD_LOCAL_STORE" />
      </Match>
     <!--
-      This is a spurious warning. Just ignore
+      None of the following variables should be referenced by any thread
+      but the collection thread in MapTask
     -->
      <Match>
        <Class name="org.apache.hadoop.mapred.MapTask$MapOutputBuffer" />
        <Field name="kvindex" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
      </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.MapTask$MapOutputBuffer" />
+       <Field name="bufferRemaining" />
+       <Bug pattern="IS2_INCONSISTENT_SYNC" />
+     </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.MapTask$MapOutputBuffer" />
+       <Field name="equator" />
+       <Bug pattern="IS2_INCONSISTENT_SYNC" />
+     </Match>
+
+    <!-- This is spurious. -->
+    <Match>
+      <Class name="org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread" />
+      <Method name="run" />
+      <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
+    </Match>
 
      <Match>
        <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeThread" />

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java Fri Feb  5 05:43:19 2010
@@ -18,29 +18,28 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.File;
-import java.io.FileWriter;
-import java.io.Writer;
-import java.io.BufferedWriter;
 import java.io.IOException;
+import java.util.Formatter;
 import java.util.StringTokenizer;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskCounter;
 
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
-
 /**
- * This is an wordcount application that tests job counters.
- * It generates simple text input files. Then
+ * This is an wordcount application that tests the count of records
+ * got spilled to disk. It generates simple text input files. Then
  * runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
  * and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
  * and 1 reduce) and verifies counters. Wordcount application reads the
@@ -50,30 +49,45 @@
  */
 public class TestJobCounters {
 
-  String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
-                          File.separator + "tmp")).toString().replace(' ', '+');
-	
-  private void validateCounters(Counters counter, long spillRecCnt, 
+  private void validateCounters(Counters counter, long spillRecCnt,
                                 long mapInputRecords, long mapOutputRecords) {
       // Check if the numer of Spilled Records is same as expected
       assertEquals(spillRecCnt,
-    		  counter.findCounter(TaskCounter.SPILLED_RECORDS).getCounter());
+          counter.findCounter(TaskCounter.SPILLED_RECORDS).getCounter());
       assertEquals(mapInputRecords,
     		  counter.findCounter(TaskCounter.MAP_INPUT_RECORDS).getCounter());
-      assertEquals(mapOutputRecords, 
+      assertEquals(mapOutputRecords,
     		  counter.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter());
   }
-  
-  private void createWordsFile(File inpFile) throws Exception {
-    Writer out = new BufferedWriter(new FileWriter(inpFile));
-    try {
-      // 500*4 unique words --- repeated 5 times => 5*2K words
-      int REPLICAS=5, NUMLINES=500, NUMWORDSPERLINE=4;
 
+  private void removeWordsFile(Path inpFile, Configuration conf)
+      throws IOException {
+    final FileSystem fs = inpFile.getFileSystem(conf);
+    if (fs.exists(inpFile) && !fs.delete(inpFile, false)) {
+      throw new IOException("Failed to delete " + inpFile);
+    }
+  }
+
+  private static void createWordsFile(Path inpFile, Configuration conf)
+      throws IOException {
+    final FileSystem fs = inpFile.getFileSystem(conf);
+    if (fs.exists(inpFile)) {
+      return;
+    }
+    FSDataOutputStream out = fs.create(inpFile);
+    try {
+      // 1024*4 unique words --- repeated 5 times => 5*2K words
+      int REPLICAS=5, NUMLINES=1024, NUMWORDSPERLINE=4;
+      final String WORD = "zymurgy"; // 7 bytes + 4 id bytes
+      final Formatter fmt = new Formatter(new StringBuilder());
       for (int i = 0; i < REPLICAS; i++) {
         for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
-          out.write("word" + j + " word" + (j+1) + " word" + (j+2) 
-                    + " word" + (j+3) + '\n');
+          ((StringBuilder)fmt.out()).setLength(0);
+          for (int k = 0; k < NUMWORDSPERLINE; ++k) {
+            fmt.format("%s%04d ", WORD, j + k);
+          }
+          ((StringBuilder)fmt.out()).append("\n");
+          out.writeBytes(fmt.toString());
         }
       }
     } finally {
@@ -81,282 +95,241 @@
     }
   }
 
+  private static Path IN_DIR = null;
+  private static Path OUT_DIR = null;
+  private static Path testdir = null;
+
+  @BeforeClass
+  public static void initPaths() throws IOException {
+    final Configuration conf = new Configuration();
+    final Path TEST_ROOT_DIR =
+      new Path(System.getProperty("test.build.data", "/tmp"));
+    testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
+    IN_DIR = new Path(testdir, "in");
+    OUT_DIR = new Path(testdir, "out");
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
+    if (fs.exists(testdir) && !fs.delete(testdir, true)) {
+      throw new IOException("Could not delete " + testdir);
+    }
+    if (!fs.mkdirs(IN_DIR)) {
+      throw new IOException("Mkdirs failed to create " + IN_DIR);
+    }
+    // create 3 input files each with 5*2k words
+    createWordsFile(new Path(IN_DIR, "input5_2k_1"), conf);
+    createWordsFile(new Path(IN_DIR, "input5_2k_2"), conf);
+    createWordsFile(new Path(IN_DIR, "input5_2k_3"), conf);
 
-  /**
-   * The main driver for word count map/reduce program.
-   * Invoke this method to submit the map/reduce job.
-   * @throws IOException When there is communication problems with the
-   *                     job tracker.
-   */
-  @Test
-  public void testOldJobWithMapAndReducers() throws Exception {
-    JobConf conf = new JobConf(TestJobCounters.class);
-    conf.setJobName("wordcount-map-reducers");
-
-    // the keys are words (strings)
-    conf.setOutputKeyClass(Text.class);
-    // the values are counts (ints)
-    conf.setOutputValueClass(IntWritable.class);
-
-    conf.setMapperClass(WordCount.MapClass.class);
-    conf.setCombinerClass(WordCount.Reduce.class);
-    conf.setReducerClass(WordCount.Reduce.class);
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    //clean up the input and output files
+    final Configuration conf = new Configuration();
+    final FileSystem fs = testdir.getFileSystem(conf);
+    if (fs.exists(testdir)) {
+      fs.delete(testdir, true);
+    }
+  }
+
+  public static JobConf createConfiguration() throws IOException {
+    JobConf baseConf = new JobConf(TestJobCounters.class);
+
+    baseConf.setOutputKeyClass(Text.class);
+    baseConf.setOutputValueClass(IntWritable.class);
+
+    baseConf.setMapperClass(WordCount.MapClass.class);
+    baseConf.setCombinerClass(WordCount.Reduce.class);
+    baseConf.setReducerClass(WordCount.Reduce.class);
+
+    baseConf.setNumReduceTasks(1);
+    baseConf.setInt(JobContext.IO_SORT_MB, 1);
+    baseConf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.50");
+    baseConf.setInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
+    return baseConf;
+  }
+
+  public static Job createJob() throws IOException {
+    final Configuration conf = new Configuration();
+    final Job baseJob = Job.getInstance(new Cluster(conf), conf);
+    baseJob.setOutputKeyClass(Text.class);
+    baseJob.setOutputValueClass(IntWritable.class);
+    baseJob.setMapperClass(NewMapTokenizer.class);
+    baseJob.setCombinerClass(NewSummer.class);
+    baseJob.setReducerClass(NewSummer.class);
+    baseJob.setNumReduceTasks(1);
+    baseJob.getConfiguration().setInt(JobContext.IO_SORT_MB, 1);
+    baseJob.getConfiguration().set(JobContext.MAP_SORT_SPILL_PERCENT, "0.50");
+    baseJob.getConfiguration().setInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMinInputSplitSize(
+        baseJob, Long.MAX_VALUE);
+    return baseJob;
+  }
 
+  @Test
+  public void testOldCounterA() throws Exception {
+    JobConf conf = createConfiguration();
     conf.setNumMapTasks(3);
-    conf.setNumReduceTasks(1);
-    conf.setInt(JobContext.IO_SORT_MB, 1);
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
-    conf.set(JobContext.MAP_SORT_RECORD_PERCENT, "0.05");
-    conf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.80");
+    removeWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
+    removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    FileInputFormat.setInputPaths(conf, IN_DIR);
+    FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO0"));
+
+    RunningJob myJob = JobClient.runJob(conf);
+    Counters c1 = myJob.getCounters();
+    // Each record requires 16 bytes of metadata, 16 bytes per serialized rec
+    // (vint word len + word + IntWritable) = (1 + 11 + 4)
+    // (2^20 buf * .5 spill pcnt) / 32 bytes/record = 2^14 recs per spill
+    // Each file contains 5 replicas of 4096 words, so the first spill will
+    // contain 4 (2^14 rec / 2^12 rec/replica) replicas, the second just one.
+
+    // Each map spills twice, emitting 4096 records per spill from the
+    // combiner per spill. The merge adds an additional 8192 records, as
+    // there are too few spills to combine (2 < 3)
+    // Each map spills 2^14 records, so maps spill 49152 records, combined.
+
+    // The reduce spill count is composed of the read from one segment and
+    // the intermediate merge of the other two. The intermediate merge
+    // adds 8192 records per segment read; again, there are too few spills to
+    // combine, so all 16834 are written to disk (total 32768 spilled records
+    // for the intermediate merge). The merge into the reduce includes only
+    // the unmerged segment, size 8192. Total spilled records in the reduce
+    // is 32768 from the merge + 8192 unmerged segment = 40960 records
+
+    // Total: map + reduce = 49152 + 40960 = 90112
+    // 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
+    // 4 records/line = 61440 output records
+    validateCounters(c1, 90112, 15360, 61440);
 
-    FileSystem fs = FileSystem.get(conf);
-    Path testDir = new Path(TEST_ROOT_DIR, "countertest");
-    conf.set("test.build.data", testDir.toString());
-    try {
-      if (fs.exists(testDir)) {
-        fs.delete(testDir, true);
-      }
-      if (!fs.mkdirs(testDir)) {
-        throw new IOException("Mkdirs failed to create " + testDir.toString());
-      }
+  }
 
-      String inDir = testDir +  File.separator + "genins" + File.separator;
-      String outDir = testDir + File.separator;
-      Path wordsIns = new Path(inDir);
-      if (!fs.mkdirs(wordsIns)) {
-        throw new IOException("Mkdirs failed to create " + wordsIns.toString());
-      }
+  @Test
+  public void testOldCounterB() throws Exception {
 
-      //create 3 input files each with 5*2k words
-      File inpFile = new File(inDir + "input5_2k_1");
-      createWordsFile(inpFile);
-      inpFile = new File(inDir + "input5_2k_2");
-      createWordsFile(inpFile);
-      inpFile = new File(inDir + "input5_2k_3");
-      createWordsFile(inpFile);
-
-      FileInputFormat.setInputPaths(conf, inDir);
-      Path outputPath1 = new Path(outDir, "output5_2k_3");
-      FileOutputFormat.setOutputPath(conf, outputPath1);
-
-      RunningJob myJob = JobClient.runJob(conf);
-      Counters c1 = myJob.getCounters();
-      // 3maps & in each map, 4 first level spills --- So total 12.
-      // spilled records count:
-      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
-      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
-      //           So total 8k+8k+2k=18k
-      // For 3 Maps, total = 3*18=54k
-      // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
-      //         So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
-      //         3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
-      //         So 0 records spilled to disk in 3rd level)
-      //         So total of 6k+4k=10k
-      // Total job counter will be 54k+10k = 64k
-      
-      //3 maps and 2.5k lines --- So total 7.5k map input records
-      //3 maps and 10k words in each --- So total of 30k map output recs
-      validateCounters(c1, 64000, 7500, 30000);
-
-      //create 4th input file each with 5*2k words and test with 4 maps
-      inpFile = new File(inDir + "input5_2k_4");
-      createWordsFile(inpFile);
-      conf.setNumMapTasks(4);
-      Path outputPath2 = new Path(outDir, "output5_2k_4");
-      FileOutputFormat.setOutputPath(conf, outputPath2);
-
-      myJob = JobClient.runJob(conf);
-      c1 = myJob.getCounters();
-      // 4maps & in each map 4 first level spills --- So total 16.
-      // spilled records count:
-      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
-      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
-      //           So total 8k+8k+2k=18k
-      // For 3 Maps, total = 4*18=72k
-      // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
-      //         So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
-      //         3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
-      //         So 0 records spilled to disk in 3rd level)
-      //         So total of 8k+8k=16k
-      // Total job counter will be 72k+16k = 88k
-      
-      // 4 maps and 2.5k words in each --- So 10k map input records
-      // 4 maps and 10k unique words --- So 40k map output records
-      validateCounters(c1, 88000, 10000, 40000);
-      
-      // check for a map only job
-      conf.setNumReduceTasks(0);
-      Path outputPath3 = new Path(outDir, "output5_2k_5");
-      FileOutputFormat.setOutputPath(conf, outputPath3);
-
-      myJob = JobClient.runJob(conf);
-      c1 = myJob.getCounters();
-      // 4 maps and 2.5k words in each --- So 10k map input records
-      // 4 maps and 10k unique words --- So 40k map output records
-      validateCounters(c1, 0, 10000, 40000);
-    } finally {
-      //clean up the input and output files
-      if (fs.exists(testDir)) {
-        fs.delete(testDir, true);
-      }
-    }
+    JobConf conf = createConfiguration();
+    createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
+    removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    conf.setNumMapTasks(4);
+    conf.setInt(JobContext.IO_SORT_FACTOR, 2);
+    FileInputFormat.setInputPaths(conf, IN_DIR);
+    FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO1"));
+
+    RunningJob myJob = JobClient.runJob(conf);
+    Counters c1 = myJob.getCounters();
+    // As above, each map spills 2^14 records, so 4 maps spill 2^16 records
+
+    // In the reduce, there are two intermediate merges before the reduce.
+    // 1st merge: read + write = 8192 * 4
+    // 2nd merge: read + write = 8192 * 4
+    // final merge: 0
+    // Total reduce: 65536
+
+    // Total: map + reduce = 2^16 + 2^16 = 131072
+    // 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
+    // 4 records/line = 81920 output records
+    validateCounters(c1, 131072, 20480, 81920);
   }
-  
-  public static class NewMapTokenizer 
-  extends Mapper<Object, Text, Text, IntWritable> {
-	private final static IntWritable one = new IntWritable(1);
-	private Text word = new Text();
-
-	public void map(Object key, Text value, Context context) 
-	throws IOException, InterruptedException {
-      StringTokenizer itr = new StringTokenizer(value.toString());
-      while (itr.hasMoreTokens()) {
-        word.set(itr.nextToken());
-        context.write(word, one);
-      }
-    }
+
+  @Test
+  public void testOldCounterC() throws Exception {
+    JobConf conf = createConfiguration();
+    createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
+    createWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    conf.setNumMapTasks(4);
+    conf.setInt(JobContext.IO_SORT_FACTOR, 3);
+    FileInputFormat.setInputPaths(conf, IN_DIR);
+    FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO2"));
+    RunningJob myJob = JobClient.runJob(conf);
+    Counters c1 = myJob.getCounters();
+    // As above, each map spills 2^14 records, so 5 maps spill 81920
+
+    // 1st merge: read + write = 6 * 8192
+    // final merge: unmerged = 2 * 8192
+    // Total reduce: 45056
+    // 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
+    // 4 records/line = 102400 output records
+    validateCounters(c1, 147456, 25600, 102400);
   }
-  
-  public static class NewIdentityReducer  
-  extends Reducer<Text, IntWritable, Text, IntWritable> {
-  private IntWritable result = new IntWritable();
-  
-  public void reduce(Text key, Iterable<IntWritable> values, 
-                     Context context) throws IOException, InterruptedException {
-    int sum = 0;
-    for (IntWritable val : values) {
-      sum += val.get();
-    }
-    result.set(sum);
-    context.write(key, result);
+
+  @Test
+  public void testNewCounterA() throws Exception {
+    final Job job = createJob();
+    final Configuration conf = job.getConfiguration();
+    conf.setInt(JobContext.IO_SORT_FACTOR, 2);
+    removeWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
+    removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
+        job, IN_DIR);
+    org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
+        job, new Path(OUT_DIR, "outputN0"));
+    assertTrue(job.waitForCompletion(true));
+    final Counters c1 = Counters.downgrade(job.getCounters());
+    validateCounters(c1, 90112, 15360, 61440);
   }
- }
-  
-  /**
-   * The main driver for word count map/reduce program.
-   * Invoke this method to submit the map/reduce job.
-   * @throws IOException When there is communication problems with the
-   *                     job tracker.
-   */
+
   @Test
-  public void testNewJobWithMapAndReducers() throws Exception {
-    JobConf conf = new JobConf(TestJobCounters.class);
-    conf.setInt(JobContext.IO_SORT_MB, 1);
+  public void testNewCounterB() throws Exception {
+    final Job job = createJob();
+    final Configuration conf = job.getConfiguration();
     conf.setInt(JobContext.IO_SORT_FACTOR, 2);
-    conf.set(JobContext.MAP_SORT_RECORD_PERCENT, "0.05");
-    conf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.80");
+    createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
+    removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
+        job, IN_DIR);
+    org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
+        job, new Path(OUT_DIR, "outputN1"));
+    assertTrue(job.waitForCompletion(true));
+    final Counters c1 = Counters.downgrade(job.getCounters());
+    validateCounters(c1, 131072, 20480, 81920);
+  }
 
-    FileSystem fs = FileSystem.get(conf);
-    Path testDir = new Path(TEST_ROOT_DIR, "countertest2");
-    conf.set("test.build.data", testDir.toString());
-    try {
-      if (fs.exists(testDir)) {
-        fs.delete(testDir, true);
-      }
-      if (!fs.mkdirs(testDir)) {
-        throw new IOException("Mkdirs failed to create " + testDir.toString());
-      }
+  @Test
+  public void testNewCounterC() throws Exception {
+    final Job job = createJob();
+    final Configuration conf = job.getConfiguration();
+    conf.setInt(JobContext.IO_SORT_FACTOR, 3);
+    createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf);
+    createWordsFile(new Path(IN_DIR, "input5_2k_5"), conf);
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
+        job, IN_DIR);
+    org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
+        job, new Path(OUT_DIR, "outputN2"));
+    assertTrue(job.waitForCompletion(true));
+    final Counters c1 = Counters.downgrade(job.getCounters());
+    validateCounters(c1, 147456, 25600, 102400);
+  }
 
-      String inDir = testDir +  File.separator + "genins" + File.separator;
-      Path wordsIns = new Path(inDir);
-      if (!fs.mkdirs(wordsIns)) {
-        throw new IOException("Mkdirs failed to create " + wordsIns.toString());
+  public static class NewMapTokenizer
+      extends org.apache.hadoop.mapreduce.Mapper<Object,Text,Text,IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(Object key, Text value, Context context)
+    throws IOException, InterruptedException {
+        StringTokenizer itr = new StringTokenizer(value.toString());
+        while (itr.hasMoreTokens()) {
+          word.set(itr.nextToken());
+          context.write(word, one);
+        }
       }
-      String outDir = testDir + File.separator;
+  }
 
-      //create 3 input files each with 5*2k words
-      File inpFile = new File(inDir + "input5_2k_1");
-      createWordsFile(inpFile);
-      inpFile = new File(inDir + "input5_2k_2");
-      createWordsFile(inpFile);
-      inpFile = new File(inDir + "input5_2k_3");
-      createWordsFile(inpFile);
-
-      FileInputFormat.setInputPaths(conf, inDir);
-      Path outputPath1 = new Path(outDir, "output5_2k_3");
-      FileOutputFormat.setOutputPath(conf, outputPath1);
-      
-      Job job = new Job(conf);
-      job.setJobName("wordcount-map-reducers");
-
-      // the keys are words (strings)
-      job.setOutputKeyClass(Text.class);
-      // the values are counts (ints)
-      job.setOutputValueClass(IntWritable.class);
-
-      job.setMapperClass(NewMapTokenizer.class);
-      job.setCombinerClass(NewIdentityReducer.class);
-      job.setReducerClass(NewIdentityReducer.class);
-
-      job.setNumReduceTasks(1);
-
-      job.waitForCompletion(false);
-      
-      org.apache.hadoop.mapreduce.Counters c1 = job.getCounters();
-      // 3maps & in each map, 4 first level spills --- So total 12.
-      // spilled records count:
-      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
-      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
-      //           So total 8k+8k+2k=18k
-      // For 3 Maps, total = 3*18=54k
-      // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
-      //         So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
-      //         3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
-      //         So 0 records spilled to disk in 3rd level)
-      //         So total of 6k+4k=10k
-      // Total job counter will be 54k+10k = 64k
-      
-      //3 maps and 2.5k lines --- So total 7.5k map input records
-      //3 maps and 10k words in each --- So total of 30k map output recs
-      validateCounters(Counters.downgrade(c1), 64000, 7500, 30000);
-
-      //create 4th input file each with 5*2k words and test with 4 maps
-      inpFile = new File(inDir + "input5_2k_4");
-      createWordsFile(inpFile);
-      JobConf newJobConf = new JobConf(job.getConfiguration());
-      
-      Path outputPath2 = new Path(outDir, "output5_2k_4");
-      
-      FileOutputFormat.setOutputPath(newJobConf, outputPath2);
-
-      Job newJob = new Job(newJobConf);
-      newJob.waitForCompletion(false);
-      c1 = newJob.getCounters();
-      // 4maps & in each map 4 first level spills --- So total 16.
-      // spilled records count:
-      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
-      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
-      //           So total 8k+8k+2k=18k
-      // For 3 Maps, total = 4*18=72k
-      // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
-      //         So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
-      //         3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
-      //         So 0 records spilled to disk in 3rd level)
-      //         So total of 8k+8k=16k
-      // Total job counter will be 72k+16k = 88k
-      
-      // 4 maps and 2.5k words in each --- So 10k map input records
-      // 4 maps and 10k unique words --- So 40k map output records
-      validateCounters(Counters.downgrade(c1), 88000, 10000, 40000);
-      
-      JobConf newJobConf2 = new JobConf(newJob.getConfiguration());
-      
-      Path outputPath3 = new Path(outDir, "output5_2k_5");
-      
-      FileOutputFormat.setOutputPath(newJobConf2, outputPath3);
-
-      Job newJob2 = new Job(newJobConf2);
-      newJob2.setNumReduceTasks(0);
-      newJob2.waitForCompletion(false);
-      c1 = newJob2.getCounters();
-      // 4 maps and 2.5k words in each --- So 10k map input records
-      // 4 maps and 10k unique words --- So 40k map output records
-      validateCounters(Counters.downgrade(c1), 0, 10000, 40000);
-    } finally {
-      //clean up the input and output files
-      if (fs.exists(testDir)) {
-        fs.delete(testDir, true);
+  public static class NewSummer
+      extends org.apache.hadoop.mapreduce.Reducer<Text,IntWritable,
+                                                  Text,IntWritable> {
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values, Context context)
+        throws IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
       }
+      result.set(sum);
+      context.write(key, result);
     }
   }
-}
\ No newline at end of file
+
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java Fri Feb  5 05:43:19 2010
@@ -24,7 +24,9 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Random;
 
@@ -322,6 +324,12 @@
   public void testNullKeys() throws Exception {
     JobConf conf = new JobConf(TestMapRed.class);
     FileSystem fs = FileSystem.getLocal(conf);
+    HashSet<String> values = new HashSet<String>();
+    String m = "AAAAAAAAAAAAAA";
+    for (int i = 1; i < 11; ++i) {
+      values.add(m);
+      m = m.replace((char)('A' + i - 1), (char)('A' + i));
+    }
     Path testdir = new Path(
         System.getProperty("test.build.data","/tmp")).makeQualified(fs);
     fs.delete(testdir, true);
@@ -329,14 +337,10 @@
     SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, inFile,
         NullWritable.class, Text.class, SequenceFile.CompressionType.NONE);
     Text t = new Text();
-    t.set("AAAAAAAAAAAAAA"); w.append(NullWritable.get(), t);
-    t.set("BBBBBBBBBBBBBB"); w.append(NullWritable.get(), t);
-    t.set("CCCCCCCCCCCCCC"); w.append(NullWritable.get(), t);
-    t.set("DDDDDDDDDDDDDD"); w.append(NullWritable.get(), t);
-    t.set("EEEEEEEEEEEEEE"); w.append(NullWritable.get(), t);
-    t.set("FFFFFFFFFFFFFF"); w.append(NullWritable.get(), t);
-    t.set("GGGGGGGGGGGGGG"); w.append(NullWritable.get(), t);
-    t.set("HHHHHHHHHHHHHH"); w.append(NullWritable.get(), t);
+    for (String s : values) {
+      t.set(s);
+      w.append(NullWritable.get(), t);
+    }
     w.close();
     FileInputFormat.setInputPaths(conf, inFile);
     FileOutputFormat.setOutputPath(conf, new Path(testdir, "nullout"));
@@ -350,13 +354,15 @@
 
     JobClient.runJob(conf);
 
+    // Since null keys all equal, allow any ordering
     SequenceFile.Reader r = new SequenceFile.Reader(fs,
         new Path(testdir, "nullout/part-00000"), conf);
-    String m = "AAAAAAAAAAAAAA";
+    m = "AAAAAAAAAAAAAA";
     for (int i = 1; r.next(NullWritable.get(), t); ++i) {
-      assertTrue(t.toString() + " doesn't match " + m, m.equals(t.toString()));
+      assertTrue("Unexpected value: " + t, values.remove(t.toString()));
       m = m.replace((char)('A' + i - 1), (char)('A' + i));
     }
+    assertTrue("Missing values: " + values.toString(), values.isEmpty());
   }
 
   private void checkCompression(boolean compressMapOutputs,

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java Fri Feb  5 05:43:19 2010
@@ -29,9 +29,10 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
 import org.apache.hadoop.mapreduce.TaskCounter;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Formatter;
@@ -240,6 +241,46 @@
     }
   }
 
+  public static class FakeSplit implements InputSplit {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class FakeIF
+      implements InputFormat<NullWritable,NullWritable> {
+
+    public FakeIF() { }
+
+    public InputSplit[] getSplits(JobConf conf, int numSplits) {
+      InputSplit[] splits = new InputSplit[numSplits];
+      for (int i = 0; i < splits.length; ++i) {
+        splits[i] = new FakeSplit();
+      }
+      return splits;
+    }
+
+    public RecordReader<NullWritable,NullWritable> getRecordReader(
+        InputSplit ignored, JobConf conf, Reporter reporter) {
+      return new RecordReader<NullWritable,NullWritable>() {
+        private boolean done = false;
+        public boolean next(NullWritable key, NullWritable value)
+            throws IOException {
+          if (done)
+            return false;
+          done = true;
+          return true;
+        }
+        public NullWritable createKey() { return NullWritable.get(); }
+        public NullWritable createValue() { return NullWritable.get(); }
+        public long getPos() throws IOException { return 0L; }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException { return 0.0f; }
+      };
+    }
+  }
+
   public static Counters runJob(JobConf conf) throws Exception {
     conf.setMapperClass(MapMB.class);
     conf.setReducerClass(MBValidate.class);