You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/02 19:10:35 UTC
svn commit: r632803 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/ReduceTask.java
Author: ddas
Date: Sun Mar 2 10:10:33 2008
New Revision: 632803
URL: http://svn.apache.org/viewvc?rev=632803&view=rev
Log:
HADOOP-910. Enables Reduces to do merges for the on-disk map output files in parallel with their copying. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632803&r1=632802&r2=632803&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sun Mar 2 10:10:33 2008
@@ -50,6 +50,9 @@
HADOOP-2895. Let the profiling string be configurable.
(Martin Traverso via cdouglas)
+ HADOOP-910. Enables Reduces to do merges for the on-disk map output files
+ in parallel with their copying. (Amar Kamat via ddas)
+
OPTIMIZATIONS
HADOOP-2790. Fixed inefficient method hasSpeculativeTask by removing
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=632803&r1=632802&r2=632803&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Sun Mar 2 10:10:33 2008
@@ -36,6 +36,8 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.Comparator;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
@@ -47,6 +49,8 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.InputBuffer;
@@ -65,7 +69,6 @@
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import static org.apache.hadoop.mapred.Task.Counter.*;
@@ -98,6 +101,29 @@
getCounters().findCounter(REDUCE_INPUT_RECORDS);
private Counters.Counter reduceOutputCounter =
getCounters().findCounter(REDUCE_OUTPUT_RECORDS);
+
+ // A custom comparator for map output files. Here the ordering is determined
+ // by the file's size and path. In case of files with same size and different
+ // file paths, the first parameter is considered smaller than the second one.
+ // In case of files with same size and path are considered equal.
+ private Comparator<FileStatus> mapOutputFileComparator =
+ new Comparator<FileStatus>() {
+ public int compare(FileStatus a, FileStatus b) {
+ if (a.getLen() < b.getLen())
+ return -1;
+ else if (a.getLen() == b.getLen())
+ if (a.getPath().toString().equals(b.getPath().toString()))
+ return 0;
+ else
+ return -1;
+ else
+ return 1;
+ }
+ };
+
+ // A sorted set for keeping a set of map output files on disk
+ private final SortedSet<FileStatus> mapOutputFilesOnDisk =
+ new TreeSet<FileStatus>(mapOutputFileComparator);
public ReduceTask() {
super();
@@ -138,6 +164,24 @@
numMaps = in.readInt();
}
+
+ // Get the input files for the reducer.
+ private Path[] getMapFiles(FileSystem fs, boolean isLocal)
+ throws IOException {
+ List<Path> fileList = new ArrayList<Path>();
+ if (isLocal) {
+ // for local jobs
+ for(int i = 0; i < numMaps; ++i) {
+ fileList.add(mapOutputFile.getInputFile(i, getTaskId()));
+ }
+ } else {
+ // for non local jobs
+ for (FileStatus filestatus : mapOutputFilesOnDisk) {
+ fileList.add(filestatus.getPath());
+ }
+ }
+ return fileList.toArray(new Path[0]);
+ }
/** Iterates values while keys match in sorted input. */
static class ValuesIterator implements Iterator {
@@ -253,34 +297,19 @@
startCommunicationThread(umbilical);
FileSystem lfs = FileSystem.getLocal(job);
+ boolean isLocal = true;
if (!job.get("mapred.job.tracker", "local").equals("local")) {
reduceCopier = new ReduceCopier(umbilical, job);
if (!reduceCopier.fetchOutputs()) {
throw new IOException(getTaskId() + "The reduce copier failed");
}
+ isLocal = false;
}
copyPhase.complete(); // copy is already complete
- // open a file to collect map output
- // since we don't know how many map outputs got merged in memory, we have
- // to check whether a given map output exists, and if it does, add it in
- // the list of files to merge, otherwise not.
- List<Path> mapFilesList = new ArrayList<Path>();
- for(int i=0; i < numMaps; i++) {
- Path f;
- try {
- //catch and ignore DiskErrorException, since some map outputs will
- //really be absent (inmem merge).
- f = mapOutputFile.getInputFile(i, getTaskId());
- } catch (DiskErrorException d) {
- continue;
- }
- if (lfs.exists(f))
- mapFilesList.add(f);
- }
- Path[] mapFiles = new Path[mapFilesList.size()];
- mapFiles = mapFilesList.toArray(mapFiles);
+ // get the input files for the reducer to merge
+ Path[] mapFiles = getMapFiles(lfs, isLocal);
Path tempDir = new Path(getTaskId());
@@ -298,6 +327,10 @@
rIter = sorter.merge(mapFiles, tempDir,
!conf.getKeepFailedTaskFiles()); // sort
+ // free up the data structures
+ mapOutputFilesOnDisk.clear();
+ mapFiles = null;
+
sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
@@ -421,11 +454,21 @@
private SequenceFile.Sorter sorter;
/**
+ * Number of files to merge at a time
+ */
+ private int ioSortFactor;
+
+ /**
* A reference to the throwable object (if merge throws an exception)
*/
private volatile Throwable mergeThrowable;
/**
+ * A flag to indicate that localFS merge is in progress
+ */
+ private volatile boolean localFSMergeInProgress = false;
+
+ /**
* A flag to indicate that merge is in progress
*/
private volatile boolean mergeInProgress = false;
@@ -755,12 +798,12 @@
if (tmpFilename == null)
throw new IOException("File " + filename + "-" + id +
" not created");
+ // This file could have been created in the inmemory
+ // fs or the localfs. So need to get the filesystem owning the path.
+ FileSystem fs = tmpFilename.getFileSystem(conf);
long bytes = -1;
// lock the ReduceTask while we do the rename
synchronized (ReduceTask.this) {
- // This file could have been created in the inmemory
- // fs or the localfs. So need to get the filesystem owning the path.
- FileSystem fs = tmpFilename.getFileSystem(conf);
if (!neededOutputs.contains(loc.getMapId())) {
fs.delete(tmpFilename);
return CopyResult.OBSOLETE;
@@ -802,6 +845,16 @@
}
neededOutputs.remove(loc.getMapId());
}
+
+ // Check if the map output file hits the local file-system by checking
+ // their schemes
+ String localFSScheme = localFileSys.getUri().getScheme();
+ String outputFileScheme = fs.getUri().getScheme();
+ if (localFSScheme.equals(outputFileScheme)) {
+ synchronized (mapOutputFilesOnDisk) {
+ mapOutputFilesOnDisk.add(fs.getFileStatus(filename));
+ }
+ }
return bytes;
}
@@ -861,6 +914,7 @@
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+ this.ioSortFactor = conf.getInt("io.sort.factor", 10);
// the exponential backoff formula
// backoff (t) = init * base^(t-1)
// so for max retries we get
@@ -931,6 +985,13 @@
copiers = new MapOutputCopier[numCopiers];
Reporter reporter = getReporter(umbilical);
+ // create an instance of the sorter for merging the on-disk files
+ SequenceFile.Sorter localFileSystemSorter =
+ new SequenceFile.Sorter(localFileSys, conf.getOutputKeyComparator(),
+ conf.getMapOutputKeyClass(),
+ conf.getMapOutputValueClass(), conf);
+ localFileSystemSorter.setProgressable(reporter);
+
// start all the copying threads
for (int i=0; i < copiers.length; i++) {
copiers[i] = new MapOutputCopier(reporter);
@@ -1040,6 +1101,23 @@
" of " + numKnown + " known outputs (" + numSlow +
" slow hosts and " + numDups + " dup hosts)");
+ // Check if a on-disk merge can be done. This will help if there
+ // are no copies to be fetched but sufficient copies to be merged.
+ synchronized (mapOutputFilesOnDisk) {
+ if (!localFSMergeInProgress
+ && (mapOutputFilesOnDisk.size() >= (2 * ioSortFactor - 1))) {
+ // make sure that only one thread merges the disk files
+ localFSMergeInProgress = true;
+ // start the on-disk-merge process
+ LocalFSMerger lfsm =
+ new LocalFSMerger((LocalFileSystem)localFileSys,
+ localFileSystemSorter);
+ lfsm.setName("Thread for merging on-disk files");
+ lfsm.setDaemon(true);
+ lfsm.start();
+ }
+ }
+
// if we have no copies in flight and we can't schedule anything
// new, just wait for a bit
try {
@@ -1214,6 +1292,11 @@
//Do a merge of in-memory files (if there are any)
if (mergeThrowable == null) {
try {
+ // Wait for the on-disk merge to complete
+ while (localFSMergeInProgress) {
+ Thread.sleep(200);
+ }
+
//wait for an ongoing merge (if it is in flight) to complete
while (mergeInProgress) {
Thread.sleep(200);
@@ -1266,6 +1349,11 @@
" Merge of the " +inMemClosedFiles.length +
" files in InMemoryFileSystem complete." +
" Local file is " + outputPath);
+
+ FileStatus status = localFileSys.getFileStatus(outputPath);
+ synchronized (mapOutputFilesOnDisk) {
+ mapOutputFilesOnDisk.add(status);
+ }
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskId() +
" Final merge of the inmemory files threw an exception: " +
@@ -1393,6 +1481,75 @@
}
+ /** Starts merging the local copy (on disk) of the map's output so that
+ * most of the reducer's input is sorted i.e overlapping shuffle
+ * and merge phases.
+ */
+ private class LocalFSMerger extends Thread {
+ private LocalFileSystem localFileSys;
+ private SequenceFile.Sorter sorter;
+
+ public LocalFSMerger(LocalFileSystem fs, SequenceFile.Sorter sorter) {
+ this.localFileSys = fs;
+ this.sorter = sorter;
+ }
+
+ public void run() {
+ try {
+ Path[] mapFiles = new Path[ioSortFactor];
+ long approxOutputSize = 0;
+ int bytesPerSum =
+ reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
+ LOG.info(reduceTask.getTaskId()
+ + " Merging map output files on disk");
+ // 1. Prepare the list of files to be merged. This list is prepared
+ // using a list of map output files on disk. Currently we merge
+ // io.sort.factor files into 1.
+ synchronized (mapOutputFilesOnDisk) {
+ for (int i = 0; i < ioSortFactor; ++i) {
+ FileStatus filestatus = mapOutputFilesOnDisk.first();
+ mapOutputFilesOnDisk.remove(filestatus);
+ mapFiles[i] = filestatus.getPath();
+ approxOutputSize += filestatus.getLen();
+ }
+ }
+ // add the checksum length
+ approxOutputSize += ChecksumFileSystem
+ .getChecksumLength(approxOutputSize,
+ bytesPerSum);
+
+ // 2. Start the on-disk merge process
+ Path outputPath =
+ lDirAlloc.getLocalPathForWrite(mapFiles[0].toString(),
+ approxOutputSize, conf)
+ .suffix(".merged");
+ SequenceFile.Writer writer =
+ sorter.cloneFileAttributes(mapFiles[0], outputPath, null);
+ SequenceFile.Sorter.RawKeyValueIterator iter;
+ Path tmpDir = new Path(reduceTask.getTaskId());
+ iter = sorter.merge(mapFiles, true, ioSortFactor, tmpDir);
+ sorter.writeFile(iter, writer);
+ writer.close();
+
+ synchronized (mapOutputFilesOnDisk) {
+ mapOutputFilesOnDisk.add(localFileSys.getFileStatus(outputPath));
+ }
+
+ LOG.info(reduceTask.getTaskId()
+ + " Finished merging map output files on disk.");
+ } catch (IOException ioe) {
+ LOG.warn(reduceTask.getTaskId()
+ + " Merging of the local FS files threw an exception: "
+ + StringUtils.stringifyException(ioe));
+ if (mergeThrowable == null) {
+ mergeThrowable = ioe;
+ }
+ } finally {
+ localFSMergeInProgress = false;
+ }
+ }
+ }
+
private class InMemFSMergeThread extends Thread {
private InMemoryFileSystem inMemFileSys;
private LocalFileSystem localFileSys;
@@ -1451,6 +1608,11 @@
" Merge of the " +inMemClosedFiles.length +
" files in InMemoryFileSystem complete." +
" Local file is " + outputPath);
+
+ FileStatus status = localFileSys.getFileStatus(outputPath);
+ synchronized (mapOutputFilesOnDisk) {
+ mapOutputFilesOnDisk.add(status);
+ }
}
else {
LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +