You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/11/19 00:43:32 UTC
svn commit: r718800 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: cdouglas
Date: Tue Nov 18 15:43:32 2008
New Revision: 718800
URL: http://svn.apache.org/viewvc?rev=718800&view=rev
Log:
Revert HADOOP-2774
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 18 15:43:32 2008
@@ -111,10 +111,6 @@
HADOOP-4612. Removes RunJar's dependency on JobClient.
(Sharad Agarwal via ddas)
- HADOOP-2774. Provide a counter for the number of first level spill files in
- the map task and a counter for counting the number of spilled records in both
- map and reduce tasks.
-
HADOOP-4185. Adds setVerifyChecksum() method to FileSystem.
(Sharad Agarwal via ddas)
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Nov 18 15:43:32 2008
@@ -65,10 +65,7 @@
long decompressedBytesWritten = 0;
long compressedBytesWritten = 0;
-
- // Count records when written to disk.
- private static long numRecordsWritten = 0;
-
+
IFileOutputStream checksumOut;
Class<K> keyClass;
@@ -181,7 +178,6 @@
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
- numRecordsWritten++;
}
public void append(DataInputBuffer key, DataInputBuffer value)
@@ -207,13 +203,8 @@
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
- numRecordsWritten++;
- }
+}
- public static long getNumRecordsWritten() {
- return numRecordsWritten;
- }
-
public long getRawLength() {
return decompressedBytesWritten;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java Tue Nov 18 15:43:32 2008
@@ -30,14 +30,11 @@
final long startOffset;
final long rawLength;
final long partLength;
- final long numRecords;
- public IndexRecord(long startOffset, long rawLength, long partLength,
- long numRecords) {
+ public IndexRecord(long startOffset, long rawLength, long partLength) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
- this.numRecords = numRecords;
}
public static IndexRecord[] readIndexFile(Path indexFileName,
@@ -63,9 +60,8 @@
long startOffset = wrapper.readLong();
long rawLength = wrapper.readLong();
long partLength = wrapper.readLong();
- long numRecords = wrapper.readLong();
indexRecordArray[i] =
- new IndexRecord(startOffset, rawLength, partLength, numRecords);
+ new IndexRecord(startOffset, rawLength, partLength);
}
}
finally {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java Tue Nov 18 15:43:32 2008
@@ -47,10 +47,5 @@
*/
public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
- /**
- * The custom http header used for the number of map output records.
- */
- public static final String MAP_OUTPUT_NUM_RECORDS = "Map-Output-Num-Records";
-
public static final String WORKDIR = "work";
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 18 15:43:32 2008
@@ -24,7 +24,6 @@
import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_FIRST_LEVEL_SPILLS;
import java.io.DataInput;
import java.io.DataOutput;
@@ -68,7 +67,7 @@
/**
* The size of each record in the index file for the map-outputs.
*/
- public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 32;
+ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
private BytesWritable split = new BytesWritable();
@@ -448,7 +447,6 @@
private final Counters.Counter mapOutputRecordCounter;
private final Counters.Counter combineInputCounter;
private final Counters.Counter combineOutputCounter;
- private final Counters.Counter firstLevelSpillsCounter;
private ArrayList<IndexRecord[]> indexCacheList;
private int totalIndexCacheMemory;
@@ -511,7 +509,6 @@
mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
- firstLevelSpillsCounter = counters.findCounter(MAP_FIRST_LEVEL_SPILLS);
// compression
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
@@ -951,10 +948,7 @@
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
- long numRecordsInThisPartition = 0;
writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
- long prevCnt = writer.getNumRecordsWritten();
-
if (null == combinerClass) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
@@ -985,19 +979,17 @@
combineAndSpill(kvIter, combineInputCounter);
}
}
- numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
+
// close the writer
writer.close();
if (indexChecksumOut != null) {
- // write the index as
- // <offset, raw-length, compressed-length, numRecords>
- writeIndexRecord(indexChecksumOut, segmentStart, writer,
- numRecordsInThisPartition);
+ // write the index as <offset, raw-length, compressed-length>
+ writeIndexRecord(indexChecksumOut, segmentStart, writer);
}
else {
irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
- writer.getCompressedLength(), numRecordsInThisPartition);
+ writer.getCompressedLength());
}
writer = null;
} finally {
@@ -1066,13 +1058,12 @@
}
writer.close();
- long numRecords = (i == partition) ? 1:0;
if (indexChecksumOut != null) {
- writeIndexRecord(indexChecksumOut,segmentStart,writer,numRecords);
+ writeIndexRecord(indexChecksumOut,segmentStart,writer);
}
else {
irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(),
- writer.getCompressedLength(), numRecords);
+ writer.getCompressedLength());
}
writer = null;
} catch (IOException e) {
@@ -1181,8 +1172,6 @@
long finalIndexFileSize = 0;
Path [] filename = new Path[numSpills];
- firstLevelSpillsCounter.increment(numSpills);
-
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
@@ -1199,8 +1188,7 @@
writeSingleSpillIndexToFile(getTaskID(),
new Path(filename[0].getParent(),"file.out.index"));
}
- spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
- return;
+ return;
}
//make correction in the length to include the sequence file header
//lengths for each partition
@@ -1232,7 +1220,7 @@
Writer<K, V> writer = new Writer<K, V>(job, finalOut,
keyClass, valClass, codec);
writer.close();
- writeIndexRecord(finalIndexChecksumOut, segmentStart, writer, 0);
+ writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
}
finalOut.close();
finalIndexChecksumOut.close();
@@ -1268,6 +1256,7 @@
}
indexRecord = null;
}
+
//merge
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter =
@@ -1281,37 +1270,19 @@
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
- long numRecordsInThisPartition;
- long prevCnt = Writer.getNumRecordsWritten();
if (null == combinerClass || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter);
} else {
combineCollector.setWriter(writer);
combineAndSpill(kvIter, combineInputCounter);
}
- numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt;
//close
writer.close();
//write index record
- writeIndexRecord(finalIndexChecksumOut,segmentStart, writer,
- numRecordsInThisPartition);
+ writeIndexRecord(finalIndexChecksumOut,segmentStart, writer);
}
-
- // In Map Phase, Spills to disk are done at 3 places:
- // (1) First Level Spills in sortAndSpill() - either
- // (a) without combiner
- // or (b) with combiner
- // (2) Outputs of Intermediate(multi-level) merges in Merger.merge
- // (3) Output of final level merge - See above if-else
- // (a) Merger.writeFile
- // or (b) combineAndSpill
- // In all the cases, IFile.Writer.append() takes care of counting
- // the records written to disk
-
- spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
-
finalOut.close();
finalIndexChecksumOut.close();
finalIndexOut.close();
@@ -1324,7 +1295,7 @@
private void writeIndexRecord(IFileOutputStream indexOut,
long start,
- Writer<K, V> writer, long numRecords)
+ Writer<K, V> writer)
throws IOException {
//when we write the offset/decompressed-length/compressed-length to
//the final index file, we write longs for both compressed and
@@ -1340,9 +1311,8 @@
wrapper.writeLong(writer.getRawLength());
long segmentLength = writer.getCompressedLength();
wrapper.writeLong(segmentLength);
- wrapper.writeLong(numRecords);
LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " +
- segmentLength + "," + numRecords + ")");
+ segmentLength + ")");
}
/**
@@ -1396,7 +1366,6 @@
wrapper.writeLong(irArray[i].startOffset);
wrapper.writeLong(irArray[i].rawLength);
wrapper.writeLong(irArray[i].partLength);
- wrapper.writeLong(irArray[i].numRecords);
}
wrapper.close();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 18 15:43:32 2008
@@ -436,17 +436,6 @@
values.informReduceProgress();
}
- // In Reduce Phase, Spills to disk are
- // (1) mapoutput directly written to reduceNode's disk in shuffleToDisk()
- // spilledRecordsCounter is updated after shuffleToDisk()
- // (2) All other spills to disk are either through
- // (a) Merger.writeFile()
- // or (b) combineAndSpill()
- // In cases 2(a) & 2(b), IFile.Writer.append() takes care of
- // counting the records written to disk
-
- spilledRecordsCounter.increment(Writer.getNumRecordsWritten());
-
//Clean up: repeated in catch block below
reducer.close();
out.close(reporter);
@@ -1245,8 +1234,6 @@
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
- long numRecords =
- Long.parseLong(connection.getHeaderField(MAP_OUTPUT_NUM_RECORDS));
// Check if this map-output can be saved in-memory
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);
@@ -1268,7 +1255,6 @@
mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
- spilledRecordsCounter.increment(numRecords);
}
return mapOutput;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Tue Nov 18 15:43:32 2008
@@ -62,15 +62,13 @@
MAP_SKIPPED_RECORDS,
MAP_INPUT_BYTES,
MAP_OUTPUT_BYTES,
- MAP_FIRST_LEVEL_SPILLS,
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS,
REDUCE_INPUT_GROUPS,
REDUCE_INPUT_RECORDS,
REDUCE_OUTPUT_RECORDS,
REDUCE_SKIPPED_GROUPS,
- REDUCE_SKIPPED_RECORDS,
- SPILLED_RECORDS
+ REDUCE_SKIPPED_RECORDS
}
/**
@@ -133,7 +131,6 @@
protected JobContext jobContext;
protected TaskAttemptContext taskContext;
private volatile boolean commitPending = false;
- protected final Counters.Counter spilledRecordsCounter;
////////////////////////////////////////////
// Constructors
@@ -142,7 +139,6 @@
public Task() {
taskStatus = TaskStatus.createTaskStatus(isMapTask());
taskId = new TaskAttemptID();
- spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
public Task(String jobFile, TaskAttemptID taskId, int partition) {
@@ -159,7 +155,6 @@
TaskStatus.Phase.SHUFFLE,
counters);
this.mapOutputFile.setJobId(taskId.getJobID());
- spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
////////////////////////////////////////////
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 18 15:43:32 2008
@@ -2760,7 +2760,6 @@
final long startOffset = info.startOffset;
final long rawPartLength = info.rawLength;
final long partLength = info.partLength;
- final long numRecords = info.numRecords;
//set the custom "Raw-Map-Output-Length" http header to
//the raw (decompressed) length
@@ -2771,11 +2770,6 @@
response.setHeader(MAP_OUTPUT_LENGTH,
Long.toString(partLength));
- //set the custom "Map-Output-Num-Records" http header to
- //the actual number of records being transferred
- response.setHeader(MAP_OUTPUT_NUM_RECORDS,
- Long.toString(numRecords));
-
//use the same buffersize as used for reading the data from disk
response.setBufferSize(MAX_BYTES_TO_READ);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Tue Nov 18 15:43:32 2008
@@ -14,6 +14,5 @@
REDUCE_OUTPUT_RECORDS.name= Reduce output records
REDUCE_SKIPPED_RECORDS.name= Reduce skipped records
REDUCE_SKIPPED_GROUPS.name= Reduce skipped groups
-SPILLED_RECORDS.name= Spilled Records
-MAP_FIRST_LEVEL_SPILLS.name= Map First Level Spills
+
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java?rev=718800&r1=718799&r2=718800&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Tue Nov 18 15:43:32 2008
@@ -43,7 +43,7 @@
fs.delete(p, true);
conf.setInt("mapred.tasktracker.indexcache.mb", 1);
final int partsPerMap = 1000;
- final int bytesPerFile = partsPerMap * 32;
+ final int bytesPerFile = partsPerMap * 24;
IndexCache cache = new IndexCache(conf);
// fill cache
@@ -105,7 +105,6 @@
assertEquals(fill, rec.startOffset);
assertEquals(fill, rec.rawLength);
assertEquals(fill, rec.partLength);
- assertEquals(fill, rec.numRecords);
}
private static void writeFile(FileSystem fs, Path f, long fill, int parts)
@@ -117,7 +116,6 @@
dout.writeLong(fill);
dout.writeLong(fill);
dout.writeLong(fill);
- dout.writeLong(fill);
}
dout.close();
}