You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/11/18 10:35:35 UTC
svn commit: r718533 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: ddas
Date: Tue Nov 18 01:35:32 2008
New Revision: 718533
URL: http://svn.apache.org/viewvc?rev=718533&view=rev
Log:
HADOOP-2774. Provide a counter for the number of first level spill files in the map task and a counter for counting the number of spilled records in both map and reduce tasks. Contributed by Ravi Gummadi.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 18 01:35:32 2008
@@ -111,6 +111,10 @@
HADOOP-4612. Removes RunJar's dependency on JobClient.
(Sharad Agarwal via ddas)
+ HADOOP-2774. Provide a counter for the number of first level spill files in
+ the map task and a counter for counting the number of spilled records in both
+ map and reduce tasks.
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Nov 18 01:35:32 2008
@@ -65,7 +65,10 @@
long decompressedBytesWritten = 0;
long compressedBytesWritten = 0;
-
+
+ // Count records when written to disk.
+ private static long numRecordsWritten = 0;
+
IFileOutputStream checksumOut;
Class<K> keyClass;
@@ -178,6 +181,7 @@
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
+ numRecordsWritten++;
}
public void append(DataInputBuffer key, DataInputBuffer value)
@@ -203,8 +207,13 @@
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
-}
+ numRecordsWritten++;
+ }
+ public static long getNumRecordsWritten() {
+ return numRecordsWritten;
+ }
+
public long getRawLength() {
return decompressedBytesWritten;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java Tue Nov 18 01:35:32 2008
@@ -30,11 +30,14 @@
final long startOffset;
final long rawLength;
final long partLength;
+ final long numRecords;
- public IndexRecord(long startOffset, long rawLength, long partLength) {
+ public IndexRecord(long startOffset, long rawLength, long partLength,
+ long numRecords) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
+ this.numRecords = numRecords;
}
public static IndexRecord[] readIndexFile(Path indexFileName,
@@ -60,8 +63,9 @@
long startOffset = wrapper.readLong();
long rawLength = wrapper.readLong();
long partLength = wrapper.readLong();
+ long numRecords = wrapper.readLong();
indexRecordArray[i] =
- new IndexRecord(startOffset, rawLength, partLength);
+ new IndexRecord(startOffset, rawLength, partLength, numRecords);
}
}
finally {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java Tue Nov 18 01:35:32 2008
@@ -47,5 +47,10 @@
*/
public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
+ /**
+ * The custom http header used for the number of map output records.
+ */
+ public static final String MAP_OUTPUT_NUM_RECORDS = "Map-Output-Num-Records";
+
public static final String WORKDIR = "work";
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 18 01:35:32 2008
@@ -24,6 +24,7 @@
import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_FIRST_LEVEL_SPILLS;
import java.io.DataInput;
import java.io.DataOutput;
@@ -67,7 +68,7 @@
/**
* The size of each record in the index file for the map-outputs.
*/
- public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 32;
private BytesWritable split = new BytesWritable();
@@ -447,6 +448,7 @@
private final Counters.Counter mapOutputRecordCounter;
private final Counters.Counter combineInputCounter;
private final Counters.Counter combineOutputCounter;
+ private final Counters.Counter firstLevelSpillsCounter;
private ArrayList<IndexRecord[]> indexCacheList;
private int totalIndexCacheMemory;
@@ -509,6 +511,7 @@
mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
+ firstLevelSpillsCounter = counters.findCounter(MAP_FIRST_LEVEL_SPILLS);
// compression
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
@@ -948,7 +951,10 @@
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
+ long numRecordsInThisPartition = 0;
writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
+ long prevCnt = writer.getNumRecordsWritten();
+
if (null == combinerClass) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
@@ -979,17 +985,19 @@
combineAndSpill(kvIter, combineInputCounter);
}
}
-
+ numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
// close the writer
writer.close();
if (indexChecksumOut != null) {
- // write the index as <offset, raw-length, compressed-length>
- writeIndexRecord(indexChecksumOut, segmentStart, writer);
+ // write the index as
+ // <offset, raw-length, compressed-length, numRecords>
+ writeIndexRecord(indexChecksumOut, segmentStart, writer,
+ numRecordsInThisPartition);
}
else {
irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
- writer.getCompressedLength());
+ writer.getCompressedLength(), numRecordsInThisPartition);
}
writer = null;
} finally {
@@ -1058,12 +1066,13 @@
}
writer.close();
+ long numRecords = (i == partition) ? 1:0;
if (indexChecksumOut != null) {
- writeIndexRecord(indexChecksumOut,segmentStart,writer);
+ writeIndexRecord(indexChecksumOut,segmentStart,writer,numRecords);
}
else {
irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
- writer.getCompressedLength());
+ writer.getCompressedLength(), numRecords);
}
writer = null;
} catch (IOException e) {
@@ -1172,6 +1181,8 @@
long finalIndexFileSize = 0;
Path [] filename = new Path[numSpills];
+ firstLevelSpillsCounter.increment(numSpills);
+
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
@@ -1188,7 +1199,8 @@
writeSingleSpillIndexToFile(getTaskID(),
new Path(filename[0].getParent(),"file.out.index"));
}
- return;
+ spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
+ return;
}
//make correction in the length to include the sequence file header
//lengths for each partition
@@ -1220,7 +1232,7 @@
Writer<K, V> writer = new Writer<K, V>(job, finalOut,
keyClass, valClass, codec);
writer.close();
- writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
+ writeIndexRecord(finalIndexChecksumOut, segmentStart, writer, 0);
}
finalOut.close();
finalIndexChecksumOut.close();
@@ -1256,7 +1268,6 @@
}
indexRecord = null;
}
-
//merge
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter =
@@ -1270,19 +1281,37 @@
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
+ long numRecordsInThisPartition;
+ long prevCnt = Writer.getNumRecordsWritten();
if (null == combinerClass || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter);
} else {
combineCollector.setWriter(writer);
combineAndSpill(kvIter, combineInputCounter);
}
+ numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
//close
writer.close();
//write index record
- writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
+ writeIndexRecord(finalIndexChecksumOut,segmentStart, writer,
+ numRecordsInThisPartition);
}
+
+ // In Map Phase, Spills to disk are done at 3 places:
+ // (1) First Level Spills in sortAndSpill() - either
+ // (a) without combiner
+ // or (b) with combiner
+ // (2) Outputs of Intermediate(multi-level) merges in Merger.merge
+ // (3) Output of final level merge - See above if-else
+ // (a) Merger.writeFile
+ // or (b) combineAndSpill
+ // In all the cases, IFile.Writer.append() takes care of counting
+ // the records written to disk
+
+ spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
+
finalOut.close();
finalIndexChecksumOut.close();
finalIndexOut.close();
@@ -1295,7 +1324,7 @@
private void writeIndexRecord(IFileOutputStream indexOut,
long start,
- Writer<K, V> writer)
+ Writer<K, V> writer, long numRecords)
throws IOException {
//when we write the offset/decompressed-length/compressed-length to
//the final index file, we write longs for both compressed and
@@ -1311,8 +1340,9 @@
wrapper.writeLong(writer.getRawLength());
long segmentLength = writer.getCompressedLength();
wrapper.writeLong(segmentLength);
+ wrapper.writeLong(numRecords);
LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " +
- segmentLength + ")");
+ segmentLength + "," + numRecords + ")");
}
/**
@@ -1366,6 +1396,7 @@
wrapper.writeLong(irArray[i].startOffset);
wrapper.writeLong(irArray[i].rawLength);
wrapper.writeLong(irArray[i].partLength);
+ wrapper.writeLong(irArray[i].numRecords);
}
wrapper.close();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 18 01:35:32 2008
@@ -436,6 +436,17 @@
values.informReduceProgress();
}
+ // In Reduce Phase, Spills to disk are
+ // (1) mapoutput directly written to reduceNode's disk in shuffleToDisk()
+ // spilledRecordsCounter is updated after shuffleToDisk()
+ // (2) All other spills to disk are either through
+ // (a) Merger.writeFile()
+ // or (b) combineAndSpill()
+ // In cases 2(a) & 2(b), IFile.Writer.append() takes care of
+ // counting the records written to disk
+
+ spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
+
//Clean up: repeated in catch block below
reducer.close();
out.close(reporter);
@@ -1234,6 +1245,8 @@
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
+ long numRecords =
+ Long.parseLong(connection.getHeaderField(MAP_OUTPUT_NUM_RECORDS));
// Check if this map-output can be saved in-memory
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);
@@ -1255,6 +1268,7 @@
mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
+ spilledRecordsCounter.increment(numRecords);
}
return mapOutput;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Tue Nov 18 01:35:32 2008
@@ -62,13 +62,15 @@
MAP_SKIPPED_RECORDS,
MAP_INPUT_BYTES,
MAP_OUTPUT_BYTES,
+ MAP_FIRST_LEVEL_SPILLS,
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS,
REDUCE_INPUT_GROUPS,
REDUCE_INPUT_RECORDS,
REDUCE_OUTPUT_RECORDS,
REDUCE_SKIPPED_GROUPS,
- REDUCE_SKIPPED_RECORDS
+ REDUCE_SKIPPED_RECORDS,
+ SPILLED_RECORDS
}
/**
@@ -131,6 +133,7 @@
protected JobContext jobContext;
protected TaskAttemptContext taskContext;
private volatile boolean commitPending = false;
+ protected final Counters.Counter spilledRecordsCounter;
////////////////////////////////////////////
// Constructors
@@ -139,6 +142,7 @@
public Task() {
taskStatus = TaskStatus.createTaskStatus(isMapTask());
taskId = new TaskAttemptID();
+ spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
public Task(String jobFile, TaskAttemptID taskId, int partition) {
@@ -155,6 +159,7 @@
TaskStatus.Phase.SHUFFLE,
counters);
this.mapOutputFile.setJobId(taskId.getJobID());
+ spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
////////////////////////////////////////////
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 18 01:35:32 2008
@@ -2760,6 +2760,7 @@
final long startOffset = info.startOffset;
final long rawPartLength = info.rawLength;
final long partLength = info.partLength;
+ final long numRecords = info.numRecords;
//set the custom "Raw-Map-Output-Length" http header to
//the raw (decompressed) length
@@ -2770,6 +2771,11 @@
response.setHeader(MAP_OUTPUT_LENGTH,
Long.toString(partLength));
+ //set the custom "Map-Output-Num-Records" http header to
+ //the actual number of records being transferred
+ response.setHeader(MAP_OUTPUT_NUM_RECORDS,
+ Long.toString(numRecords));
+
//use the same buffersize as used for reading the data from disk
response.setBufferSize(MAX_BYTES_TO_READ);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Tue Nov 18 01:35:32 2008
@@ -14,5 +14,6 @@
REDUCE_OUTPUT_RECORDS.name= Reduce output records
REDUCE_SKIPPED_RECORDS.name= Reduce skipped records
REDUCE_SKIPPED_GROUPS.name= Reduce skipped groups
-
+SPILLED_RECORDS.name= Spilled Records
+MAP_FIRST_LEVEL_SPILLS.name= Map First Level Spills
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java?rev=718533&r1=718532&r2=718533&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Tue Nov 18 01:35:32 2008
@@ -43,7 +43,7 @@
fs.delete(p, true);
conf.setInt("mapred.tasktracker.indexcache.mb", 1);
final int partsPerMap = 1000;
- final int bytesPerFile = partsPerMap * 24;
+ final int bytesPerFile = partsPerMap * 32;
IndexCache cache = new IndexCache(conf);
// fill cache
@@ -105,6 +105,7 @@
assertEquals(fill, rec.startOffset);
assertEquals(fill, rec.rawLength);
assertEquals(fill, rec.partLength);
+ assertEquals(fill, rec.numRecords);
}
private static void writeFile(FileSystem fs, Path f, long fill, int parts)
@@ -116,6 +117,7 @@
dout.writeLong(fill);
dout.writeLong(fill);
dout.writeLong(fill);
+ dout.writeLong(fill);
}
dout.close();
}