You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/11/25 23:31:33 UTC
svn commit: r720627 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: cdouglas
Date: Tue Nov 25 14:31:33 2008
New Revision: 720627
URL: http://svn.apache.org/viewvc?rev=720627&view=rev
Log:
HADOOP-2774. Add counters tracking records spilled to disk in MapTask and
ReduceTask. Contributed by Ravi Gummadi.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 25 14:31:33 2008
@@ -141,6 +141,9 @@
HADOOP-4339. Remove redundant calls from FileSystem/FsShell when
generating/processing ContentSummary. (David Phillips via cdouglas)
+ HADOOP-2774. Add counters tracking records spilled to disk in MapTask and
+ ReduceTask. (Ravi Gummadi via cdouglas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Nov 25 14:31:33 2008
@@ -65,7 +65,11 @@
long decompressedBytesWritten = 0;
long compressedBytesWritten = 0;
-
+
+ // Count records written to disk
+ private long numRecordsWritten = 0;
+ private final Counters.Counter writtenRecordsCounter;
+
IFileOutputStream checksumOut;
Class<K> keyClass;
@@ -77,14 +81,18 @@
public Writer(Configuration conf, FileSystem fs, Path file,
Class<K> keyClass, Class<V> valueClass,
- CompressionCodec codec) throws IOException {
- this(conf, fs.create(file), keyClass, valueClass, codec);
+ CompressionCodec codec,
+ Counters.Counter writesCounter) throws IOException {
+ this(conf, fs.create(file), keyClass, valueClass, codec,
+ writesCounter);
ownOutputStream = true;
}
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
- CompressionCodec codec) throws IOException {
+ CompressionCodec codec, Counters.Counter writesCounter)
+ throws IOException {
+ this.writtenRecordsCounter = writesCounter;
this.checksumOut = new IFileOutputStream(out);
this.rawOut = out;
this.start = this.rawOut.getPos();
@@ -107,7 +115,7 @@
this.valueSerializer = serializationFactory.getSerializer(valueClass);
this.valueSerializer.open(buffer);
}
-
+
public void close() throws IOException {
// Close the serializers
@@ -140,6 +148,9 @@
rawOut.close();
}
out = null;
+ if(writtenRecordsCounter != null) {
+ writtenRecordsCounter.increment(numRecordsWritten);
+ }
}
public void append(K key, V value) throws IOException {
@@ -178,6 +189,7 @@
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
+ ++numRecordsWritten;
}
public void append(DataInputBuffer key, DataInputBuffer value)
@@ -203,7 +215,8 @@
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
-}
+ ++numRecordsWritten;
+ }
public long getRawLength() {
return decompressedBytesWritten;
@@ -221,6 +234,10 @@
private static final int DEFAULT_BUFFER_SIZE = 128*1024;
private static final int MAX_VINT_SIZE = 9;
+ // Count records read from disk
+ private long numRecordsRead = 0;
+ private final Counters.Counter readRecordsCounter;
+
final InputStream in; // Possibly decompressed stream that we read
Decompressor decompressor;
long bytesRead = 0;
@@ -242,14 +259,15 @@
* @param file Path of the file to be opened. This file should have
* checksum bytes for the data at the end of the file.
* @param codec codec
+ * @param readsCounter Counter for records read from disk
* @throws IOException
*/
-
public Reader(Configuration conf, FileSystem fs, Path file,
- CompressionCodec codec) throws IOException {
+ CompressionCodec codec,
+ Counters.Counter readsCounter) throws IOException {
this(conf, fs.open(file),
fs.getFileStatus(file).getLen(),
- codec);
+ codec, readsCounter);
}
/**
@@ -260,10 +278,13 @@
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
+ * @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(Configuration conf, FSDataInputStream in, long length,
- CompressionCodec codec) throws IOException {
+ CompressionCodec codec,
+ Counters.Counter readsCounter) throws IOException {
+ readRecordsCounter = readsCounter;
checksumIn = new IFileInputStream(in,length);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
@@ -400,7 +421,8 @@
bytesRead += recordLength;
++recNo;
-
+ ++numRecordsRead;
+
return true;
}
@@ -418,6 +440,9 @@
// Release the buffer
dataIn = null;
buffer = null;
+ if(readRecordsCounter != null) {
+ readRecordsCounter.increment(numRecordsRead);
+ }
}
}
@@ -431,7 +456,7 @@
public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
byte[] data, int start, int length)
throws IOException {
- super(null, null, length - start, null);
+ super(null, null, length - start, null, null);
this.ramManager = ramManager;
this.taskAttemptId = taskAttemptId;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 25 14:31:33 2008
@@ -948,7 +948,8 @@
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
- writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
+ writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
+ spilledRecordsCounter);
if (null == combinerClass) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
@@ -1047,7 +1048,8 @@
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
- writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec);
+ writer = new IFile.Writer<K, V>(job, out, keyClass, valClass, codec,
+ spilledRecordsCounter);
if (i == partition) {
final long recordStart = out.getPos();
@@ -1188,7 +1190,7 @@
writeSingleSpillIndexToFile(getTaskID(),
new Path(filename[0].getParent(),"file.out.index"));
}
- return;
+ return;
}
//make correction in the length to include the sequence file header
//lengths for each partition
@@ -1217,8 +1219,8 @@
//create dummy files
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
- Writer<K, V> writer = new Writer<K, V>(job, finalOut,
- keyClass, valClass, codec);
+ Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass,
+ valClass, codec, null);
writer.close();
writeIndexRecord(finalIndexChecksumOut, segmentStart, writer);
}
@@ -1245,8 +1247,8 @@
in.seek(segmentOffset);
Segment<K, V> s =
- new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
- true);
+ new Segment<K, V>(new Reader<K, V>(job, in, segmentLength,
+ codec, null), true);
segmentList.add(i, s);
if (LOG.isDebugEnabled()) {
@@ -1264,12 +1266,14 @@
keyClass, valClass,
segmentList, job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()),
- job.getOutputKeyComparator(), reporter);
+ job.getOutputKeyComparator(), reporter,
+ null, spilledRecordsCounter);
//write merged output to disk
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
- new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
+ new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
+ spilledRecordsCounter);
if (null == combinerClass || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter);
} else {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Nov 25 14:31:33 2008
@@ -54,12 +54,15 @@
CompressionCodec codec,
Path[] inputs, boolean deleteInputs,
int mergeFactor, Path tmpDir,
- RawComparator<K> comparator, Progressable reporter)
+ RawComparator<K> comparator, Progressable reporter,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter)
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
reporter).merge(keyClass, valueClass,
- mergeFactor, tmpDir);
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter);
}
public static <K extends Object, V extends Object>
@@ -67,10 +70,12 @@
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
- RawComparator<K> comparator, Progressable reporter)
+ RawComparator<K> comparator, Progressable reporter,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter)
throws IOException {
return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
- comparator, reporter, false);
+ comparator, reporter, false, readsCounter, writesCounter);
}
public static <K extends Object, V extends Object>
@@ -79,11 +84,14 @@
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
- boolean sortSegments)
+ boolean sortSegments,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments).merge(keyClass, valueClass,
- mergeFactor, tmpDir);
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter);
}
static <K extends Object, V extends Object>
@@ -92,12 +100,15 @@
List<Segment<K, V>> segments,
int mergeFactor, int inMemSegments, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
- boolean sortSegments)
+ boolean sortSegments,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
- tmpDir);
+ tmpDir,
+ readsCounter, writesCounter);
}
public static <K extends Object, V extends Object>
@@ -144,9 +155,9 @@
this.segmentLength = reader.getLength();
}
- private void init() throws IOException {
+ private void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
- reader = new Reader<K, V>(conf, fs, file, codec);
+ reader = new Reader<K, V>(conf, fs, file, codec, readsCounter);
}
}
@@ -309,13 +320,18 @@
}
public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
- int factor, Path tmpDir)
+ int factor, Path tmpDir,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter)
throws IOException {
- return merge(keyClass, valueClass, factor, 0, tmpDir);
+ return merge(keyClass, valueClass, factor, 0, tmpDir,
+ readsCounter, writesCounter);
}
RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
- int factor, int inMem, Path tmpDir)
+ int factor, int inMem, Path tmpDir,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter)
throws IOException {
LOG.info("Merging " + segments.size() + " sorted segments");
@@ -344,7 +360,7 @@
for (Segment<K, V> segment : mStream) {
// Initialize the segment at the last possible moment;
// this helps in ensuring we don't use buffers until we need them
- segment.init();
+ segment.init(readsCounter);
long startPos = segment.getPosition();
boolean hasNext = segment.next();
long endPos = segment.getPosition();
@@ -417,7 +433,8 @@
approxOutputSize, conf);
Writer<K, V> writer =
- new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec);
+ new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
+ writesCounter);
writeFile(this, writer, reporter);
writer.close();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 25 14:31:33 2008
@@ -382,7 +382,7 @@
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
- reporter)
+ reporter, spilledRecordsCounter, null)
: reduceCopier.createKVIterator(job, rfs, reporter);
// free up the data structures
@@ -2080,9 +2080,9 @@
reduceTask.getTaskID(), inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
- tmpDir, comparator, reporter);
+ tmpDir, comparator, reporter, spilledRecordsCounter, null);
final Writer writer = new Writer(job, fs, outputPath,
- keyClass, valueClass, codec);
+ keyClass, valueClass, codec, null);
try {
Merger.writeFile(rIter, writer, reporter);
addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
@@ -2139,7 +2139,7 @@
RawKeyValueIterator diskMerge = Merger.merge(
job, fs, keyClass, valueClass, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
- reporter, false);
+ reporter, false, spilledRecordsCounter, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
@@ -2149,7 +2149,7 @@
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
- comparator, reporter);
+ comparator, reporter, spilledRecordsCounter, null);
}
class RawKVIteratorReader extends IFile.Reader<K,V> {
@@ -2158,7 +2158,7 @@
public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
throws IOException {
- super(null, null, size, null);
+ super(null, null, size, null, spilledRecordsCounter);
this.kvIter = kvIter;
}
@@ -2383,7 +2383,7 @@
new Writer(conf,rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
- codec);
+ codec, null);
RawKeyValueIterator iter = null;
Path tmpDir = new Path(reduceTask.getTaskID().toString());
final Reporter reporter = getReporter(umbilical);
@@ -2393,7 +2393,8 @@
conf.getMapOutputValueClass(),
codec, mapFiles.toArray(new Path[mapFiles.size()]),
true, ioSortFactor, tmpDir,
- conf.getOutputKeyComparator(), reporter);
+ conf.getOutputKeyComparator(), reporter,
+ spilledRecordsCounter, null);
Merger.writeFile(iter, writer, reporter);
writer.close();
@@ -2477,7 +2478,7 @@
new Writer(conf, rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
- codec);
+ codec, null);
RawKeyValueIterator rIter = null;
final Reporter reporter = getReporter(umbilical);
@@ -2490,7 +2491,8 @@
(Class<V>)conf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceTask.getTaskID().toString()),
- conf.getOutputKeyComparator(), reporter);
+ conf.getOutputKeyComparator(), reporter,
+ spilledRecordsCounter, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Tue Nov 25 14:31:33 2008
@@ -68,7 +68,8 @@
REDUCE_INPUT_RECORDS,
REDUCE_OUTPUT_RECORDS,
REDUCE_SKIPPED_GROUPS,
- REDUCE_SKIPPED_RECORDS
+ REDUCE_SKIPPED_RECORDS,
+ SPILLED_RECORDS
}
/**
@@ -131,6 +132,7 @@
protected JobContext jobContext;
protected TaskAttemptContext taskContext;
private volatile boolean commitPending = false;
+ protected final Counters.Counter spilledRecordsCounter;
////////////////////////////////////////////
// Constructors
@@ -139,6 +141,7 @@
public Task() {
taskStatus = TaskStatus.createTaskStatus(isMapTask());
taskId = new TaskAttemptID();
+ spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
public Task(String jobFile, TaskAttemptID taskId, int partition) {
@@ -155,6 +158,7 @@
TaskStatus.Phase.SHUFFLE,
counters);
this.mapOutputFile.setJobId(taskId.getJobID());
+ spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
////////////////////////////////////////////
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Tue Nov 25 14:31:33 2008
@@ -14,5 +14,5 @@
REDUCE_OUTPUT_RECORDS.name= Reduce output records
REDUCE_SKIPPED_RECORDS.name= Reduce skipped records
REDUCE_SKIPPED_GROUPS.name= Reduce skipped groups
-
+SPILLED_RECORDS.name= Spilled Records
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java?rev=720627&r1=720626&r2=720627&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Tue Nov 25 14:31:33 2008
@@ -80,7 +80,8 @@
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(tmpDir, "data.in");
IFile.Writer<Text, Text> writer =
- new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, codec);
+ new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+ codec, null);
for(Pair p: vals) {
writer.append(new Text(p.key), new Text(p.value));
}
@@ -90,7 +91,7 @@
RawKeyValueIterator rawItr =
Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path},
false, conf.getInt("io.sort.factor", 100), tmpDir,
- new Text.Comparator(), new NullProgress());
+ new Text.Comparator(), new NullProgress(),null,null);
@SuppressWarnings("unchecked") // WritableComparators are not generic
ReduceTask.ValuesIterator valItr =
new ReduceTask.ValuesIterator<Text,Text>(rawItr,
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java?rev=720627&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java Tue Nov 25 14:31:33 2008
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Writer;
+import java.io.BufferedWriter;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This is an wordcount application that tests the count of records
+ * got spilled to disk. It generates simple text input files. Then
+ * runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
+ * and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
+ * and 1 reduce) and verifies counters. Wordcount application reads the
+ * text input files, breaks each line into words and counts them. The output
+ * is a locally sorted list of words and the count of how often they occurred.
+ *
+ */
+public class TestSpilledRecordsCounter extends TestCase {
+
+ private void validateCounters(Counters counter, long spillRecCnt) {
+ // Check if the numer of Spilled Records is same as expected
+ assertEquals(counter.findCounter(Task.Counter.SPILLED_RECORDS).
+ getCounter(), spillRecCnt);
+ }
+
+ private void createWordsFile(File inpFile) throws Exception {
+ Writer out = new BufferedWriter(new FileWriter(inpFile));
+ try {
+ // 500*4 unique words --- repeated 5 times => 5*2K words
+ int REPLICAS=5, NUMLINES=500, NUMWORDSPERLINE=4;
+
+ for (int i = 0; i < REPLICAS; i++) {
+ for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
+ out.write("word" + j + " word" + (j+1) + " word" + (j+2) + " word" + (j+3) + '\n');
+ }
+ }
+ } finally {
+ out.close();
+ }
+ }
+
+
+ /**
+ * The main driver for word count map/reduce program.
+ * Invoke this method to submit the map/reduce job.
+ * @throws IOException When there is communication problems with the
+ * job tracker.
+ */
+ public void testSpillCounter() throws Exception {
+ JobConf conf = new JobConf(TestSpilledRecordsCounter.class);
+ conf.setJobName("wordcountSpilledRecordsCounter");
+
+ // the keys are words (strings)
+ conf.setOutputKeyClass(Text.class);
+ // the values are counts (ints)
+ conf.setOutputValueClass(IntWritable.class);
+
+ conf.setMapperClass(WordCount.MapClass.class);
+ conf.setCombinerClass(WordCount.Reduce.class);
+ conf.setReducerClass(WordCount.Reduce.class);
+
+ conf.setNumMapTasks(3);
+ conf.setNumReduceTasks(1);
+ conf.setInt("io.sort.mb", 1);
+ conf.setInt("io.sort.factor", 2);
+ conf.set("io.sort.record.percent", "0.05");
+ conf.set("io.sort.spill.percent", "0.80");
+
+
+ String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+ File.separator + "tmp"))
+ .toString().replace(' ', '+');
+ conf.set("test.build.data", TEST_ROOT_DIR);
+ String IN_DIR = TEST_ROOT_DIR + File.separator +
+ "spilledRecords.countertest" + File.separator +
+ "genins" + File.separator;
+ String OUT_DIR = TEST_ROOT_DIR + File.separator +
+ "spilledRecords.countertest" + File.separator;
+
+ FileSystem fs = FileSystem.get(conf);
+ Path testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
+ try {
+ if (fs.exists(testdir)) {
+ fs.delete(testdir, true);
+ }
+ if (!fs.mkdirs(testdir)) {
+ throw new IOException("Mkdirs failed to create " + testdir.toString());
+ }
+
+ Path wordsIns = new Path(testdir, "genins");
+ if (!fs.mkdirs(wordsIns)) {
+ throw new IOException("Mkdirs failed to create " + wordsIns.toString());
+ }
+
+ //create 3 input files each with 5*2k words
+ File inpFile = new File(IN_DIR + "input5_2k_1");
+ createWordsFile(inpFile);
+ inpFile = new File(IN_DIR + "input5_2k_2");
+ createWordsFile(inpFile);
+ inpFile = new File(IN_DIR + "input5_2k_3");
+ createWordsFile(inpFile);
+
+ FileInputFormat.setInputPaths(conf, IN_DIR);
+ Path outputPath1=new Path(OUT_DIR, "output5_2k_3");
+ FileOutputFormat.setOutputPath(conf, outputPath1);
+
+ RunningJob myJob = JobClient.runJob(conf);
+ Counters c1 = myJob.getCounters();
+ // 3maps & in each map, 4 first level spills --- So total 12.
+ // spilled records count:
+ // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+ // 3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+ // So total 8k+8k+2k=18k
+ // For 3 Maps, total = 3*18=54k
+ // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
+ // So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
+ // 3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
+ // So 0 records spilled to disk in 3rd level)
+ // So total of 6k+4k=10k
+ // Total job counter will be 54k+10k = 64k
+ validateCounters(c1, 64000);
+
+ //create 4th input file each with 5*2k words and test with 4 maps
+ inpFile = new File(IN_DIR + "input5_2k_4");
+ createWordsFile(inpFile);
+ conf.setNumMapTasks(4);
+ Path outputPath2=new Path(OUT_DIR, "output5_2k_4");
+ FileOutputFormat.setOutputPath(conf, outputPath2);
+
+ myJob = JobClient.runJob(conf);
+ c1 = myJob.getCounters();
+ // 4maps & in each map 4 first level spills --- So total 16.
+ // spilled records count:
+ // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+ // 3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+ // So total 8k+8k+2k=18k
+ // For 3 Maps, total = 4*18=72k
+ // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
+ // So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
+ // 3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
+ // So 0 records spilled to disk in 3rd level)
+ // So total of 8k+8k=16k
+ // Total job counter will be 72k+16k = 88k
+ validateCounters(c1, 88000);
+ } finally {
+ //clean up the input and output files
+ if (fs.exists(testdir)) {
+ fs.delete(testdir, true);
+ }
+ }
+ }
+}