You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/05/29 02:58:44 UTC
svn commit: r779807 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/MapTask.java
Author: cdouglas
Date: Fri May 29 00:58:43 2009
New Revision: 779807
URL: http://svn.apache.org/viewvc?rev=779807&view=rev
Log:
HADOOP-5664. Change map serialization so a lock is obtained only where
contention is possible, rather than for each write.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779807&r1=779806&r2=779807&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri May 29 00:58:43 2009
@@ -401,6 +401,9 @@
HADOOP-5620. Add an option to DistCp for preserving modification and access
times. (Rodrigo Schmidt via szetszwo)
+ HADOOP-5664. Change map serialization so a lock is obtained only where
+ contention is possible, rather than for each write. (cdouglas)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=779807&r1=779806&r2=779807&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri May 29 00:58:43 2009
@@ -592,8 +592,8 @@
}
- class MapOutputBuffer<K extends Object, V extends Object>
- implements MapOutputCollector<K, V>, IndexedSortable {
+ class MapOutputBuffer<K extends Object, V extends Object>
+ implements MapOutputCollector<K, V>, IndexedSortable {
private final int partitions;
private final Partitioner<K, V> partitioner;
private final JobConf job;
@@ -635,6 +635,8 @@
private volatile Throwable sortSpillException = null;
private final int softRecordLimit;
private final int softBufferLimit;
+ private int recordRemaining;
+ private int bufferRemaining;
private final int minSpillsForCombine;
private final IndexedSorter sorter;
private final ReentrantLock spillLock = new ReentrantLock();
@@ -682,8 +684,8 @@
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
}
- sorter = ReflectionUtils.newInstance(
- job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
+ sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
+ QuickSort.class, IndexedSorter.class), job);
LOG.info("io.sort.mb = " + sortmb);
// buffers and accounting
int maxMemUsage = sortmb << 20;
@@ -696,6 +698,8 @@
kvindices = new int[recordCapacity * ACCTSIZE];
softBufferLimit = (int)(kvbuffer.length * spillper);
softRecordLimit = (int)(kvoffsets.length * spillper);
+ recordRemaining = softRecordLimit;
+ bufferRemaining = softBufferLimit;
LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
// k/v serialization
@@ -763,38 +767,52 @@
+ value.getClass().getName());
}
final int kvnext = (kvindex + 1) % kvoffsets.length;
- spillLock.lock();
- try {
- boolean kvfull;
- do {
- if (sortSpillException != null) {
- throw (IOException)new IOException("Spill failed"
- ).initCause(sortSpillException);
- }
- // sufficient acct space
- kvfull = kvnext == kvstart;
- final boolean kvsoftlimit = ((kvnext > kvend)
- ? kvnext - kvend > softRecordLimit
- : kvend - kvnext <= kvoffsets.length - softRecordLimit);
- if (kvstart == kvend && kvsoftlimit) {
- LOG.info("Spilling map output: record full = " + kvsoftlimit);
- startSpill();
- }
- if (kvfull) {
- try {
- while (kvstart != kvend) {
- reporter.progress();
- spillDone.await();
+ if (--recordRemaining <= 0) {
+ // Possible for check to remain < zero, if soft limit remains
+ // in force but unsatisfiable because spill is in progress
+ spillLock.lock();
+ try {
+ boolean kvfull;
+ do {
+ if (sortSpillException != null) {
+ throw (IOException)new IOException("Spill failed"
+ ).initCause(sortSpillException);
+ }
+ // sufficient acct space
+ kvfull = kvnext == kvstart;
+ final boolean kvsoftlimit = ((kvnext > kvend)
+ ? kvnext - kvend > softRecordLimit
+ : kvend - kvnext <= kvoffsets.length - softRecordLimit);
+ if (kvstart == kvend && kvsoftlimit) {
+ LOG.info("Spilling map output: record full = " + kvfull);
+ startSpill();
+ }
+ if (kvfull) {
+ try {
+ while (kvstart != kvend) {
+ reporter.progress();
+ spillDone.await();
+ }
+ } catch (InterruptedException e) {
+ throw (IOException)new IOException(
+ "Collector interrupted while waiting for the writer"
+ ).initCause(e);
}
- } catch (InterruptedException e) {
- throw (IOException)new IOException(
- "Collector interrupted while waiting for the writer"
- ).initCause(e);
}
- }
- } while (kvfull);
- } finally {
- spillLock.unlock();
+ } while (kvfull);
+ final int softOff = kvend + softRecordLimit;
+ recordRemaining = Math.min(
+ // out of acct space
+ (kvnext < kvstart
+ ? kvstart - kvnext
+ : kvoffsets.length - kvnext + kvstart),
+ // soft limit
+ (kvend < kvnext
+ ? softOff - kvnext
+ : kvnext + (softOff - kvoffsets.length)));
+ } finally {
+ spillLock.unlock();
+ }
}
try {
@@ -905,7 +923,7 @@
* likely result in data loss or corruption.
* @see #markRecord()
*/
- protected synchronized void reset() throws IOException {
+ protected void reset() throws IOException {
// spillLock unnecessary; If spill wraps, then
// bufindex < bufstart < bufend so contention is impossible
// a stale value for bufstart does not affect correctness, since
@@ -931,7 +949,7 @@
private final byte[] scratch = new byte[1];
@Override
- public synchronized void write(int v)
+ public void write(int v)
throws IOException {
scratch[0] = (byte)v;
write(scratch, 0, 1);
@@ -945,69 +963,86 @@
* deserialize into the collection buffer.
*/
@Override
- public synchronized void write(byte b[], int off, int len)
+ public void write(byte b[], int off, int len)
throws IOException {
boolean buffull = false;
boolean wrap = false;
- spillLock.lock();
- try {
- do {
- if (sortSpillException != null) {
- throw (IOException)new IOException("Spill failed"
- ).initCause(sortSpillException);
- }
+ bufferRemaining -= len;
+ if (bufferRemaining <= 0) {
+ // writing these bytes could exhaust available buffer space
+ // check if spill or blocking is necessary
+ spillLock.lock();
+ try {
+ do {
+ if (sortSpillException != null) {
+ throw (IOException)new IOException("Spill failed"
+ ).initCause(sortSpillException);
+ }
- // sufficient buffer space?
- if (bufstart <= bufend && bufend <= bufindex) {
- buffull = bufindex + len > bufvoid;
- wrap = (bufvoid - bufindex) + bufstart > len;
- } else {
- // bufindex <= bufstart <= bufend
- // bufend <= bufindex <= bufstart
- wrap = false;
- buffull = bufindex + len > bufstart;
- }
+ // sufficient buffer space?
+ if (bufstart <= bufend && bufend <= bufindex) {
+ buffull = bufindex + len > bufvoid;
+ wrap = (bufvoid - bufindex) + bufstart > len;
+ } else {
+ // bufindex <= bufstart <= bufend
+ // bufend <= bufindex <= bufstart
+ wrap = false;
+ buffull = bufindex + len > bufstart;
+ }
- if (kvstart == kvend) {
- // spill thread not running
- if (kvend != kvindex) {
- // we have records we can spill
- final boolean bufsoftlimit = (bufindex > bufend)
- ? bufindex - bufend > softBufferLimit
- : bufend - bufindex < bufvoid - softBufferLimit;
- if (bufsoftlimit || (buffull && !wrap)) {
- LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
- startSpill();
+ if (kvstart == kvend) {
+ // spill thread not running
+ if (kvend != kvindex) {
+ // we have records we can spill
+ final boolean bufsoftlimit = (bufindex > bufend)
+ ? bufindex - bufend > softBufferLimit
+ : bufend - bufindex < bufvoid - softBufferLimit;
+ if (bufsoftlimit || (buffull && !wrap)) {
+ LOG.info("Spilling map output: buffer full= " + (buffull && !wrap));
+ startSpill();
+ }
+ } else if (buffull && !wrap) {
+ // We have no buffered records, and this record is too large
+ // to write into kvbuffer. We must spill it directly from
+ // collect
+ final int size = ((bufend <= bufindex)
+ ? bufindex - bufend
+ : (bufvoid - bufend) + bufindex) + len;
+ bufstart = bufend = bufindex = bufmark = 0;
+ kvstart = kvend = kvindex = 0;
+ bufvoid = kvbuffer.length;
+ throw new MapBufferTooSmallException(size + " bytes");
}
- } else if (buffull && !wrap) {
- // We have no buffered records, and this record is too large
- // to write into kvbuffer. We must spill it directly from
- // collect
- final int size = ((bufend <= bufindex)
- ? bufindex - bufend
- : (bufvoid - bufend) + bufindex) + len;
- bufstart = bufend = bufindex = bufmark = 0;
- kvstart = kvend = kvindex = 0;
- bufvoid = kvbuffer.length;
- throw new MapBufferTooSmallException(size + " bytes");
}
- }
- if (buffull && !wrap) {
- try {
- while (kvstart != kvend) {
- reporter.progress();
- spillDone.await();
+ if (buffull && !wrap) {
+ try {
+ while (kvstart != kvend) {
+ reporter.progress();
+ spillDone.await();
+ }
+ } catch (InterruptedException e) {
+ throw (IOException)new IOException(
+ "Buffer interrupted while waiting for the writer"
+ ).initCause(e);
}
- } catch (InterruptedException e) {
- throw (IOException)new IOException(
- "Buffer interrupted while waiting for the writer"
- ).initCause(e);
}
- }
- } while (buffull && !wrap);
- } finally {
- spillLock.unlock();
+ } while (buffull && !wrap);
+ final int softOff = bufend + softBufferLimit;
+ bufferRemaining = Math.min(
+ // out of buffer space
+ (bufindex < bufstart
+ ? bufstart - bufindex
+ : kvbuffer.length - bufindex + bufstart),
+ // soft limit
+ (bufend < bufindex
+ ? softOff - bufindex
+ : bufindex + (softOff - kvbuffer.length)));
+ } finally {
+ spillLock.unlock();
+ }
+ } else {
+ buffull = bufindex + len > bufvoid;
}
// here, we know that we have sufficient space to write
if (buffull) {
@@ -1019,11 +1054,12 @@
}
System.arraycopy(b, off, kvbuffer, bufindex, len);
bufindex += len;
+ bufferRemaining -= len;
}
}
- public synchronized void flush() throws IOException, ClassNotFoundException,
- InterruptedException {
+ public void flush() throws IOException, ClassNotFoundException,
+ InterruptedException {
LOG.info("Starting flush of map output");
spillLock.lock();
try {
@@ -1103,7 +1139,7 @@
}
}
- private synchronized void startSpill() {
+ private void startSpill() {
LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
"; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +