You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/05/29 02:58:44 UTC

svn commit: r779807 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java

Author: cdouglas
Date: Fri May 29 00:58:43 2009
New Revision: 779807

URL: http://svn.apache.org/viewvc?rev=779807&view=rev
Log:
HADOOP-5664. Change map serialization so a lock is obtained only where
contention is possible, rather than for each write.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779807&r1=779806&r2=779807&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri May 29 00:58:43 2009
@@ -401,6 +401,9 @@
     HADOOP-5620. Add an option to DistCp for preserving modification and access
     times.  (Rodrigo Schmidt via szetszwo)
 
+    HADOOP-5664. Change map serialization so a lock is obtained only where
+    contention is possible, rather than for each write. (cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=779807&r1=779806&r2=779807&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri May 29 00:58:43 2009
@@ -592,8 +592,8 @@
     
   }
 
-  class MapOutputBuffer<K extends Object, V extends Object> 
-  implements MapOutputCollector<K, V>, IndexedSortable {
+  class MapOutputBuffer<K extends Object, V extends Object>
+      implements MapOutputCollector<K, V>, IndexedSortable {
     private final int partitions;
     private final Partitioner<K, V> partitioner;
     private final JobConf job;
@@ -635,6 +635,8 @@
     private volatile Throwable sortSpillException = null;
     private final int softRecordLimit;
     private final int softBufferLimit;
+    private int recordRemaining;
+    private int bufferRemaining;
     private final int minSpillsForCombine;
     private final IndexedSorter sorter;
     private final ReentrantLock spillLock = new ReentrantLock();
@@ -682,8 +684,8 @@
       if ((sortmb & 0x7FF) != sortmb) {
         throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
       }
-      sorter = ReflectionUtils.newInstance(
-            job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
+      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
+            QuickSort.class, IndexedSorter.class), job);
       LOG.info("io.sort.mb = " + sortmb);
       // buffers and accounting
       int maxMemUsage = sortmb << 20;
@@ -696,6 +698,8 @@
       kvindices = new int[recordCapacity * ACCTSIZE];
       softBufferLimit = (int)(kvbuffer.length * spillper);
       softRecordLimit = (int)(kvoffsets.length * spillper);
+      recordRemaining = softRecordLimit;
+      bufferRemaining = softBufferLimit;
       LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
       LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
       // k/v serialization
@@ -763,38 +767,52 @@
                               + value.getClass().getName());
       }
       final int kvnext = (kvindex + 1) % kvoffsets.length;
-      spillLock.lock();
-      try {
-        boolean kvfull;
-        do {
-          if (sortSpillException != null) {
-            throw (IOException)new IOException("Spill failed"
-                ).initCause(sortSpillException);
-          }
-          // sufficient acct space
-          kvfull = kvnext == kvstart;
-          final boolean kvsoftlimit = ((kvnext > kvend)
-              ? kvnext - kvend > softRecordLimit
-              : kvend - kvnext <= kvoffsets.length - softRecordLimit);
-          if (kvstart == kvend && kvsoftlimit) {
-            LOG.info("Spilling map output: record full = " + kvsoftlimit);
-            startSpill();
-          }
-          if (kvfull) {
-            try {
-              while (kvstart != kvend) {
-                reporter.progress();
-                spillDone.await();
+      if (--recordRemaining <= 0) {
+        // Possible for check to remain < zero, if soft limit remains
+        // in force but unsatisfiable because spill is in progress
+        spillLock.lock();
+        try {
+          boolean kvfull;
+          do {
+            if (sortSpillException != null) {
+              throw (IOException)new IOException("Spill failed"
+                  ).initCause(sortSpillException);
+            }
+            // sufficient acct space
+            kvfull = kvnext == kvstart;
+            final boolean kvsoftlimit = ((kvnext > kvend)
+                ? kvnext - kvend > softRecordLimit
+                : kvend - kvnext <= kvoffsets.length - softRecordLimit);
+            if (kvstart == kvend && kvsoftlimit) {
+              LOG.info("Spilling map output: record full = " + kvfull);
+              startSpill();
+            }
+            if (kvfull) {
+              try {
+                while (kvstart != kvend) {
+                  reporter.progress();
+                  spillDone.await();
+                }
+              } catch (InterruptedException e) {
+                throw (IOException)new IOException(
+                    "Collector interrupted while waiting for the writer"
+                    ).initCause(e);
               }
-            } catch (InterruptedException e) {
-              throw (IOException)new IOException(
-                  "Collector interrupted while waiting for the writer"
-                  ).initCause(e);
             }
-          }
-        } while (kvfull);
-      } finally {
-        spillLock.unlock();
+          } while (kvfull);
+          final int softOff = kvend + softRecordLimit;
+          recordRemaining = Math.min(
+              // out of acct space
+              (kvnext < kvstart
+                 ? kvstart - kvnext
+                 : kvoffsets.length - kvnext + kvstart),
+              // soft limit
+              (kvend < kvnext
+                 ? softOff - kvnext
+                 : kvnext + (softOff - kvoffsets.length)));
+        } finally {
+          spillLock.unlock();
+        }
       }
 
       try {
@@ -905,7 +923,7 @@
        * likely result in data loss or corruption.
        * @see #markRecord()
        */
-      protected synchronized void reset() throws IOException {
+      protected void reset() throws IOException {
         // spillLock unnecessary; If spill wraps, then
         // bufindex < bufstart < bufend so contention is impossible
         // a stale value for bufstart does not affect correctness, since
@@ -931,7 +949,7 @@
       private final byte[] scratch = new byte[1];
 
       @Override
-      public synchronized void write(int v)
+      public void write(int v)
           throws IOException {
         scratch[0] = (byte)v;
         write(scratch, 0, 1);
@@ -945,69 +963,86 @@
        *    deserialize into the collection buffer.
        */
       @Override
-      public synchronized void write(byte b[], int off, int len)
+      public void write(byte b[], int off, int len)
           throws IOException {
         boolean buffull = false;
         boolean wrap = false;
-        spillLock.lock();
-        try {
-          do {
-            if (sortSpillException != null) {
-              throw (IOException)new IOException("Spill failed"
-                  ).initCause(sortSpillException);
-            }
+        bufferRemaining -= len;
+        if (bufferRemaining <= 0) {
+          // writing these bytes could exhaust available buffer space
+          // check if spill or blocking is necessary
+          spillLock.lock();
+          try {
+            do {
+              if (sortSpillException != null) {
+                throw (IOException)new IOException("Spill failed"
+                    ).initCause(sortSpillException);
+              }
 
-            // sufficient buffer space?
-            if (bufstart <= bufend && bufend <= bufindex) {
-              buffull = bufindex + len > bufvoid;
-              wrap = (bufvoid - bufindex) + bufstart > len;
-            } else {
-              // bufindex <= bufstart <= bufend
-              // bufend <= bufindex <= bufstart
-              wrap = false;
-              buffull = bufindex + len > bufstart;
-            }
+              // sufficient buffer space?
+              if (bufstart <= bufend && bufend <= bufindex) {
+                buffull = bufindex + len > bufvoid;
+                wrap = (bufvoid - bufindex) + bufstart > len;
+              } else {
+                // bufindex <= bufstart <= bufend
+                // bufend <= bufindex <= bufstart
+                wrap = false;
+                buffull = bufindex + len > bufstart;
+              }
 
-            if (kvstart == kvend) {
-              // spill thread not running
-              if (kvend != kvindex) {
-                // we have records we can spill
-                final boolean bufsoftlimit = (bufindex > bufend)
-                  ? bufindex - bufend > softBufferLimit
-                  : bufend - bufindex < bufvoid - softBufferLimit;
-                if (bufsoftlimit || (buffull && !wrap)) {
-                  LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
-                  startSpill();
+              if (kvstart == kvend) {
+                // spill thread not running
+                if (kvend != kvindex) {
+                  // we have records we can spill
+                  final boolean bufsoftlimit = (bufindex > bufend)
+                    ? bufindex - bufend > softBufferLimit
+                    : bufend - bufindex < bufvoid - softBufferLimit;
+                  if (bufsoftlimit || (buffull && !wrap)) {
+                    LOG.info("Spilling map output: buffer full= " + (buffull && !wrap));
+                    startSpill();
+                  }
+                } else if (buffull && !wrap) {
+                  // We have no buffered records, and this record is too large
+                  // to write into kvbuffer. We must spill it directly from
+                  // collect
+                  final int size = ((bufend <= bufindex)
+                    ? bufindex - bufend
+                    : (bufvoid - bufend) + bufindex) + len;
+                  bufstart = bufend = bufindex = bufmark = 0;
+                  kvstart = kvend = kvindex = 0;
+                  bufvoid = kvbuffer.length;
+                  throw new MapBufferTooSmallException(size + " bytes");
                 }
-              } else if (buffull && !wrap) {
-                // We have no buffered records, and this record is too large
-                // to write into kvbuffer. We must spill it directly from
-                // collect
-                final int size = ((bufend <= bufindex)
-                  ? bufindex - bufend
-                  : (bufvoid - bufend) + bufindex) + len;
-                bufstart = bufend = bufindex = bufmark = 0;
-                kvstart = kvend = kvindex = 0;
-                bufvoid = kvbuffer.length;
-                throw new MapBufferTooSmallException(size + " bytes");
               }
-            }
 
-            if (buffull && !wrap) {
-              try {
-                while (kvstart != kvend) {
-                  reporter.progress();
-                  spillDone.await();
+              if (buffull && !wrap) {
+                try {
+                  while (kvstart != kvend) {
+                    reporter.progress();
+                    spillDone.await();
+                  }
+                } catch (InterruptedException e) {
+                    throw (IOException)new IOException(
+                        "Buffer interrupted while waiting for the writer"
+                        ).initCause(e);
                 }
-              } catch (InterruptedException e) {
-                  throw (IOException)new IOException(
-                      "Buffer interrupted while waiting for the writer"
-                      ).initCause(e);
               }
-            }
-          } while (buffull && !wrap);
-        } finally {
-          spillLock.unlock();
+            } while (buffull && !wrap);
+            final int softOff = bufend + softBufferLimit;
+            bufferRemaining = Math.min(
+                // out of buffer space
+                (bufindex < bufstart
+                   ? bufstart - bufindex
+                   : kvbuffer.length - bufindex + bufstart),
+                // soft limit
+                (bufend < bufindex
+                   ? softOff - bufindex
+                   : bufindex + (softOff - kvbuffer.length)));
+          } finally {
+            spillLock.unlock();
+          }
+        } else {
+          buffull = bufindex + len > bufvoid;
         }
         // here, we know that we have sufficient space to write
         if (buffull) {
@@ -1019,11 +1054,12 @@
         }
         System.arraycopy(b, off, kvbuffer, bufindex, len);
         bufindex += len;
+        bufferRemaining -= len;
       }
     }
 
-    public synchronized void flush() throws IOException, ClassNotFoundException,
-                                            InterruptedException {
+    public void flush() throws IOException, ClassNotFoundException,
+           InterruptedException {
       LOG.info("Starting flush of map output");
       spillLock.lock();
       try {
@@ -1103,7 +1139,7 @@
       }
     }
 
-    private synchronized void startSpill() {
+    private void startSpill() {
       LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
                "; bufvoid = " + bufvoid);
       LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +