You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/08/14 07:31:03 UTC
svn commit: r565629 - in /lucene/hadoop/branches/branch-0.14: ./
src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/
Author: omalley
Date: Mon Aug 13 22:31:02 2007
New Revision: 565629
URL: http://svn.apache.org/viewvc?view=rev&rev=565629
Log:
Merge of -r 565628 from trunk to 0.14 branch. Fixes HADOOP-1698.
Modified:
lucene/hadoop/branches/branch-0.14/CHANGES.txt
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MergeSorter.java
Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?view=diff&rev=565629&r1=565628&r2=565629
==============================================================================
--- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Mon Aug 13 22:31:02 2007
@@ -514,6 +514,9 @@
152. HADOOP-1629. Added a upgrade test for HADOOP-1134.
(Raghu Angadi via nigel)
+153. HADOOP-1698. Fix performance problems on map output sorting for jobs
+ with large numbers of reduces. (Devaraj Das via omalley)
+
Release 0.13.0 - 2007-06-08
1. HADOOP-1047. Fix TestReplication to succeed more reliably.
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=565629&r1=565628&r2=565629
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/io/SequenceFile.java Mon Aug 13 22:31:02 2007
@@ -648,9 +648,10 @@
long lastSyncPos; // position of last sync
byte[] sync; // 16 random bytes
{
- try { // use hash of uid + host
+ try {
MessageDigest digester = MessageDigest.getInstance("MD5");
- digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
+ long time = System.currentTimeMillis();
+ digester.update((new UID()+"@"+time).getBytes());
sync = digester.digest();
} catch (Exception e) {
throw new RuntimeException(e);
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java?view=diff&rev=565629&r1=565628&r2=565629
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java Mon Aug 13 22:31:02 2007
@@ -50,16 +50,21 @@
//4 for indices into startOffsets array in the
//pointers array (ignored the partpointers list itself)
static private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
+ static private final int INITIAL_ARRAY_SIZE = 5;
+ //we maintain the max lengths of the key/val that we encounter. During
+ //iteration of the sorted results, we will create a DataOutputBuffer to
+ //return the keys. The max size of the DataOutputBuffer will be the max
+ //keylength that we encounter. Expose this value to model memory more
+ //accurately.
+ private int maxKeyLength = 0;
+ private int maxValLength = 0;
+
//Reference to the Progressable object for sending KeepAlive
private Progressable reporter;
//Implementation of methods of the SorterBase interface
//
public void configure(JobConf conf) {
- startOffsets = new int[1024];
- keyLengths = new int[1024];
- valueLengths = new int[1024];
- pointers = new int[1024];
comparator = conf.getOutputKeyComparator();
}
@@ -70,10 +75,16 @@
public void addKeyValue(int recordOffset, int keyLength, int valLength) {
//Add the start offset of the key in the startOffsets array and the
//length in the keyLengths array.
- if (count == startOffsets.length)
+ if (startOffsets == null || count == startOffsets.length)
grow();
startOffsets[count] = recordOffset;
keyLengths[count] = keyLength;
+ if (keyLength > maxKeyLength) {
+ maxKeyLength = keyLength;
+ }
+ if (valLength > maxValLength) {
+ maxValLength = valLength;
+ }
valueLengths[count] = valLength;
pointers[count] = count;
count++;
@@ -85,14 +96,30 @@
}
public long getMemoryUtilized() {
- return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD;
+ //the total length of the arrays + the max{Key,Val}Length (this will be the
+ //max size of the DataOutputBuffers during the iteration of the sorted
+ //keys).
+ if (startOffsets != null) {
+ return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD +
+ maxKeyLength + maxValLength;
+ }
+ else { //nothing from this yet
+ return 0;
+ }
}
public abstract RawKeyValueIterator sort();
public void close() {
- //just set count to 0; we reuse the arrays
+ //set count to 0; also, we don't reuse the arrays since we want to maintain
+ //consistency in the memory model
count = 0;
+ startOffsets = null;
+ keyLengths = null;
+ valueLengths = null;
+ pointers = null;
+ maxKeyLength = 0;
+ maxValLength = 0;
}
//A compare method that references the keyValBuffer through the indirect
//pointers
@@ -106,7 +133,11 @@
}
private void grow() {
- int newLength = startOffsets.length * 3/2;
+ int currLength = 0;
+ if (startOffsets != null) {
+ currLength = startOffsets.length;
+ }
+ int newLength = (int)(currLength * 1.1) + 1;
startOffsets = grow(startOffsets, newLength);
keyLengths = grow(keyLengths, newLength);
valueLengths = grow(valueLengths, newLength);
@@ -115,7 +146,9 @@
private int[] grow(int[] old, int newLength) {
int[] result = new int[newLength];
- System.arraycopy(old, 0, result, 0, old.length);
+ if(old != null) {
+ System.arraycopy(old, 0, result, 0, old.length);
+ }
return result;
}
} //BasicTypeSorterBase
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=565629&r1=565628&r2=565629
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java Mon Aug 13 22:31:02 2007
@@ -330,6 +330,9 @@
}
synchronized (this) {
+ if (keyValBuffer == null) {
+ keyValBuffer = new DataOutputBuffer();
+ }
//dump the key/value to buffer
int keyOffset = keyValBuffer.getLength();
key.write(keyValBuffer);
@@ -350,7 +353,9 @@
totalMem += sortImpl[i].getMemoryUtilized();
if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
sortAndSpillToDisk();
- keyValBuffer.reset();
+ //we don't reuse the keyValBuffer. We want to maintain consistency
+ //in the memory model (for negligible performance loss).
+ keyValBuffer = null;
for (int i = 0; i < partitions; i++) {
sortImpl[i].close();
}
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MergeSorter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MergeSorter.java?view=diff&rev=565629&r1=565628&r2=565629
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MergeSorter.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MergeSorter.java Mon Aug 13 22:31:02 2007
@@ -58,4 +58,12 @@
public int compare (IntWritable i, IntWritable j) {
return super.compare(i.get(), j.get());
}
+
+ /** Add the extra memory that will be utilized by the sort method */
+ public long getMemoryUtilized() {
+ //this is memory that will be actually utilized (considering the temp
+ //array that will be allocated by the sort() method (mergesort))
+ return super.getMemoryUtilized() + super.count * 4;
+ }
+
}