You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/07 23:25:08 UTC
svn commit: r535997 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Mon May 7 14:25:07 2007
New Revision: 535997
URL: http://svn.apache.org/viewvc?view=rev&rev=535997
Log:
HADOOP-1252. Changed MapReduce's allocation of local files to use round-robin among configured devices, rather than a hashcode, also improving error handling. Contributed by Devaraj.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May 7 14:25:07 2007
@@ -351,6 +351,11 @@
104. HADOOP-1200. Restore disk checking lost in HADOOP-1170.
(Hairong Kuang via cutting)
+105. HADOOP-1252. Changed MapReduce's allocation of local files to
+ use round-robin among available devices, rather than a hashcode.
+ More care is also taken to not allocate files on full or offline
+ drives. (Devaraj Das via cutting)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java Mon May 7 14:25:07 2007
@@ -40,6 +40,10 @@
public abstract class ChecksumFileSystem extends FilterFileSystem {
private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
+ public static double getApproxChkSumLength(long size) {
+ return FSOutputSummer.CHKSUM_AS_FRACTION * size;
+ }
+
public ChecksumFileSystem(FileSystem fs) {
super(fs);
}
@@ -343,6 +347,7 @@
private Checksum sum = new CRC32();
private int inSum;
private int bytesPerSum;
+ private static final float CHKSUM_AS_FRACTION = 0.01f;
public FSOutputSummer(ChecksumFileSystem fs,
Path file,
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java?view=auto&rev=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java Mon May 7 14:25:07 2007
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.conf.Configuration;
+
+/** An implementation of a round-robin scheme for disk allocation for creating
+ * files. The way it works is that it is kept track what disk was last
+ * allocated for a file write. For the current request, the next disk from
+ * the set of disks would be allocated if the free space on the disk is
+ * sufficient enough to accomodate the file that is being considered for
+ * creation. If the space requirements cannot be met, the next disk in order
+ * would be tried and so on till a disk is found with sufficient capacity.
+ * Once a disk with sufficient space is identified, a check is done to make
+ * sure that the disk is writable. Also, there is an API provided that doesn't
+ * take the space requirements into consideration but just checks whether the
+ * disk under consideration is writable (this should be used for cases where
+ * the file size is not known apriori). An API is provided to read a path that
+ * was created earlier. That API works by doing a scan of all the disks for the
+ * input pathname.
+ * This implementation also provides the functionality of having multiple
+ * allocators per JVM (one for each unique functionality or context, like
+ * mapred, dfs-client, etc.). It ensures that there is only one instance of
+ * an allocator per context per JVM.
+ * Note:
+ * 1. The contexts referred above are actually the configuration items defined
+ * in the Configuration class like "mapred.local.dir" (for which we want to
+ * control the dir allocations). The context-strings are exactly those
+ * configuration items.
+ * 2. This implementation does not take into consideration cases where
+ * a disk becomes read-only or goes out of space while a file is being written
+ * to (disks are shared between multiple processes, and so the latter situation
+ * is probable).
+ * 3. In the class implementation, "Disk" is referred to as "Dir", which
+ * actually points to the configured directory on the Disk which will be the
+ * parent for all file write/read allocations.
+ */
+public class LocalDirAllocator {
+
+ //A Map from the config item names like "mapred.local.dir",
+ //"dfs.client.buffer.dir" to the instance of the AllocatorPerContext. This
+ //is a static object to make sure there exists exactly one instance per JVM
+ private static Map <String, AllocatorPerContext> contexts =
+ new TreeMap<String, AllocatorPerContext>();
+ private String contextCfgItemName;
+
+ /**Create an allocator object
+ * @param contextCfgItemName
+ */
+ public LocalDirAllocator(String contextCfgItemName) {
+ this.contextCfgItemName = contextCfgItemName;
+ }
+
+ /** This method must be used to obtain the dir allocation context for a
+ * particular value of the context name. The context name must be an item
+ * defined in the Configuration object for which we want to control the
+ * dir allocations (e.g., <code>mapred.local.dir</code>). The method will
+ * create a context for that name if it doesn't already exist.
+ */
+ private AllocatorPerContext obtainContext(String contextCfgItemName) {
+ synchronized (contexts) {
+ AllocatorPerContext l = contexts.get(contextCfgItemName);
+ if (l == null) {
+ contexts.put(contextCfgItemName,
+ (l = new AllocatorPerContext(contextCfgItemName)));
+ }
+ return l;
+ }
+ }
+
+ /** Get a path from the local FS. This method should be used if the size of
+ * the file is not known apriori. We go round-robin over the set of disks
+ * (via the configured dirs) and return the first complete path where
+ * we could create the parent directory of the passed path.
+ * @param pathStr the requested path (this will be created on the first
+ * available disk)
+ * @param conf the Configuration object
+ * @return the complete path to the file on a local disk
+ * @throws IOException
+ */
+ public Path getLocalPathForWrite(String pathStr,
+ Configuration conf) throws IOException {
+ return getLocalPathForWrite(pathStr, -1, conf);
+ }
+
+ /** Get a path from the local FS. Pass size as -1 if not known apriori. We
+ * round-robin over the set of disks (via the configured dirs) and return
+ * the first complete path which has enough space
+ * @param pathStr the requested path (this will be created on the first
+ * available disk)
+ * @param size the size of the file that is going to be written
+ * @param conf the Configuration object
+ * @return the complete path to the file on a local disk
+ * @throws IOException
+ */
+ public Path getLocalPathForWrite(String pathStr, long size,
+ Configuration conf) throws IOException {
+ AllocatorPerContext context = obtainContext(contextCfgItemName);
+ return context.getLocalPathForWrite(pathStr, size, conf);
+ }
+
+ /** Get a path from the local FS for reading. We search through all the
+ * configured dirs for the file's existence and return the complete
+ * path to the file when we find one
+ * @param pathStr the requested file (this will be searched)
+ * @param conf the Configuration object
+ * @return the complete path to the file on a local disk
+ * @throws IOException
+ */
+ public Path getLocalPathToRead(String pathStr,
+ Configuration conf) throws IOException {
+ AllocatorPerContext context = obtainContext(contextCfgItemName);
+ return context.getLocalPathToRead(pathStr, conf);
+ }
+
+ /** Method to check whether a context is valid
+ * @param contextCfgItemName
+ * @return true/false
+ */
+ public static boolean isContextValid(String contextCfgItemName) {
+ synchronized (contexts) {
+ return contexts.containsKey(contextCfgItemName);
+ }
+ }
+
+ private class AllocatorPerContext {
+
+ private final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.fs.AllocatorPerContext");
+
+ private int dirNumLastAccessed;
+ private FileSystem localFS;
+ private DF[] dirDF;
+ private String contextCfgItemName;
+ private String[] localDirs;
+ private String savedLocalDirs = "";
+
+ public AllocatorPerContext(String contextCfgItemName) {
+ this.contextCfgItemName = contextCfgItemName;
+ }
+
+ /** This method gets called everytime before any read/write to make sure
+ * that any change to localDirs is reflected immediately.
+ */
+ private void confChanged(Configuration conf) throws IOException {
+ String newLocalDirs = conf.get(contextCfgItemName);
+ if (!newLocalDirs.equals(savedLocalDirs)) {
+ localDirs = conf.getStrings(contextCfgItemName);
+ localFS = FileSystem.getLocal(conf);
+ int numDirs = localDirs.length;
+ dirDF = new DF[numDirs];
+ for (int i = 0; i < numDirs; i++) {
+ try {
+ localFS.mkdirs(new Path(localDirs[i]));
+ } catch (IOException ie) { } //ignore
+ dirDF[i] = new DF(new File(localDirs[i]), 30000);
+ }
+ dirNumLastAccessed = 0;
+ savedLocalDirs = newLocalDirs;
+ }
+ }
+
+ private Path createPath(String path) throws IOException {
+ Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
+ path);
+ //check whether we are able to create a directory here. If the disk
+ //happens to be RDONLY we will fail
+ try {
+ DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
+ return file;
+ } catch (DiskErrorException d) {
+ LOG.warn(StringUtils.stringifyException(d));
+ return null;
+ }
+ }
+
+ /** Get a path from the local FS. This method should be used if the size of
+ * the file is not known apriori. We go round-robin over the set of disks
+ * (via the configured dirs) and return the first complete path where
+ * we could create the parent directory of the passed path.
+ */
+ public synchronized Path getLocalPathForWrite(String path,
+ Configuration conf) throws IOException {
+ return getLocalPathForWrite(path, -1, conf);
+ }
+
+ /** Get a path from the local FS. Pass size as -1 if not known apriori. We
+ * round-robin over the set of disks (via the configured dirs) and return
+ * the first complete path which has enough space
+ */
+ public synchronized Path getLocalPathForWrite(String pathStr, long size,
+ Configuration conf) throws IOException {
+ confChanged(conf);
+ int numDirs = localDirs.length;
+ int numDirsSearched = 0;
+ //remove the leading slash from the path (to make sure that the uri
+ //resolution results in a valid path on the dir being checked)
+ if (pathStr.startsWith("/")) {
+ pathStr = pathStr.substring(1);
+ }
+ Path returnPath = null;
+ while (numDirsSearched < numDirs && returnPath == null) {
+ if (size >= 0) {
+ long capacity = dirDF[dirNumLastAccessed].getAvailable();
+ if (capacity > size) {
+ returnPath = createPath(pathStr);
+ }
+ } else {
+ returnPath = createPath(pathStr);
+ }
+ dirNumLastAccessed++;
+ dirNumLastAccessed = dirNumLastAccessed % numDirs;
+ numDirsSearched++;
+ }
+
+ if (returnPath != null) {
+ return returnPath;
+ }
+
+ //no path found
+ throw new DiskErrorException("Could not find any valid local " +
+ "directory for " + pathStr);
+ }
+
+ /** Get a path from the local FS for reading. We search through all the
+ * configured dirs for the file's existence and return the complete
+ * path to the file when we find one
+ */
+ public synchronized Path getLocalPathToRead(String pathStr,
+ Configuration conf) throws IOException {
+ confChanged(conf);
+ int numDirs = localDirs.length;
+ int numDirsSearched = 0;
+ //remove the leading slash from the path (to make sure that the uri
+ //resolution results in a valid path on the dir being checked)
+ if (pathStr.startsWith("/")) {
+ pathStr = pathStr.substring(1);
+ }
+ while (numDirsSearched < numDirs) {
+ Path file = new Path(localDirs[numDirsSearched], pathStr);
+ if (localFS.exists(file)) {
+ return file;
+ }
+ numDirsSearched++;
+ }
+
+ //no path found
+ throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
+ " the configured local directories");
+ }
+ }
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon May 7 14:25:07 2007
@@ -2383,6 +2383,7 @@
int numSegments = sortedSegmentSizes.size();
int origFactor = factor;
int passNo = 1;
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
do {
//get the factor for this pass of merge
factor = getPassFactor(passNo, numSegments);
@@ -2435,11 +2436,19 @@
return this;
} else {
//we want to spread the creation of temp files on multiple disks if
- //available
+ //available under the space constraints
+ long approxOutputSize = 0;
+ for (SegmentDescriptor s : segmentsToMerge) {
+ approxOutputSize += s.segmentLength +
+ ChecksumFileSystem.getApproxChkSumLength(
+ s.segmentLength);
+ }
Path tmpFilename =
new Path(tmpDir, "intermediate").suffix("." + passNo);
- Path outputFile = conf.getLocalPath("mapred.local.dir",
- tmpFilename.toString());
+
+ Path outputFile = lDirAlloc.getLocalPathForWrite(
+ tmpFilename.toString(),
+ approxOutputSize, conf);
LOG.info("writing intermediate results to " + outputFile);
Writer writer = cloneFileAttributes(
fs.makeQualified(segmentsToMerge.get(0).segmentPathName),
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon May 7 14:25:07 2007
@@ -140,7 +140,8 @@
for (int i = 0; i < mapIds.size(); i++) {
String mapId = mapIds.get(i);
Path mapOut = this.mapoutputFile.getOutputFile(mapId);
- Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);
+ Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId,
+ localFs.getLength(mapOut));
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Mon May 7 14:25:07 2007
@@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.conf.*;
/**
@@ -29,49 +30,108 @@
class MapOutputFile {
private JobConf conf;
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator("mapred.local.dir");
- /** Create a local map output file name.
+ /** Return the path to local map output file created earlier
* @param mapTaskId a map task id
*/
public Path getOutputFile(String mapTaskId)
throws IOException {
- return conf.getLocalPath(mapTaskId+"/file.out");
+ return lDirAlloc.getLocalPathToRead(mapTaskId+"/file.out", conf);
}
- /** Create a local map output index file name.
+ /** Create a local map output file name.
+ * @param mapTaskId a map task id
+ * @param size the size of the file
+ */
+ public Path getOutputFileForWrite(String mapTaskId, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(mapTaskId+"/file.out", size, conf);
+ }
+
+ /** Return the path to a local map output index file created earlier
* @param mapTaskId a map task id
*/
public Path getOutputIndexFile(String mapTaskId)
throws IOException {
- return conf.getLocalPath(mapTaskId+"/file.out.index");
+ return lDirAlloc.getLocalPathToRead(mapTaskId + "/file.out.index", conf);
}
- /** Create a local map spill file name.
+ /** Create a local map output index file name.
+ * @param mapTaskId a map task id
+ * @param size the size of the file
+ */
+ public Path getOutputIndexFileForWrite(String mapTaskId, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(mapTaskId + "/file.out.index",
+ size, conf);
+ }
+
+ /** Return a local map spill file created earlier.
* @param mapTaskId a map task id
* @param spillNumber the number
*/
public Path getSpillFile(String mapTaskId, int spillNumber)
throws IOException {
- return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out");
+ return lDirAlloc.getLocalPathToRead(mapTaskId+"/spill" +spillNumber+".out",
+ conf);
}
- /** Create a local map spill index file name.
+ /** Create a local map spill file name.
+ * @param mapTaskId a map task id
+ * @param spillNumber the number
+ * @param size the size of the file
+ */
+ public Path getSpillFileForWrite(String mapTaskId, int spillNumber,
+ long size) throws IOException {
+ return lDirAlloc.getLocalPathForWrite(mapTaskId+
+ "/spill" +spillNumber+".out",
+ size, conf);
+ }
+
+ /** Return a local map spill index file created earlier
* @param mapTaskId a map task id
* @param spillNumber the number
*/
public Path getSpillIndexFile(String mapTaskId, int spillNumber)
throws IOException {
- return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out.index");
+ return lDirAlloc.getLocalPathToRead(
+ mapTaskId+"/spill" +spillNumber+".out.index", conf);
}
- /** Create a local reduce input file name.
+ /** Create a local map spill index file name.
+ * @param mapTaskId a map task id
+ * @param spillNumber the number
+ * @param size the size of the file
+ */
+ public Path getSpillIndexFileForWrite(String mapTaskId, int spillNumber,
+ long size) throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ mapTaskId+"/spill" +spillNumber+".out.index", size, conf);
+ }
+
+ /** Return a local reduce input file created earlier
* @param mapTaskId a map task id
* @param reduceTaskId a reduce task id
*/
public Path getInputFile(int mapId, String reduceTaskId)
throws IOException {
// TODO *oom* should use a format here
- return conf.getLocalPath(reduceTaskId+"/map_"+mapId+".out");
+ return lDirAlloc.getLocalPathToRead(reduceTaskId + "/map_"+mapId+".out",
+ conf);
+ }
+
+ /** Create a local reduce input file name.
+ * @param mapTaskId a map task id
+ * @param reduceTaskId a reduce task id
+ * @param size the size of the file
+ */
+ public Path getInputFileForWrite(int mapId, String reduceTaskId, long size)
+ throws IOException {
+ // TODO *oom* should use a format here
+ return lDirAlloc.getLocalPathForWrite(reduceTaskId + "/map_"+mapId+".out",
+ size, conf);
}
/** Removes all of the files related to a task. */
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Mon May 7 14:25:07 2007
@@ -26,9 +26,11 @@
import org.apache.hadoop.fs.InMemoryFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.*;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.conf.*;
/** The location of a map output file, as passed to a reduce task via the
* {@link InterTrackerProtocol}. */
@@ -174,7 +176,10 @@
* We use the file system so that we generate checksum files on the data.
* @param inMemFileSys the inmemory filesystem to write the file to
* @param localFileSys the local filesystem to write the file to
+ * @param shuffleMetrics the metrics context
* @param localFilename the filename to write the data into
+ * @param lDirAlloc the LocalDirAllocator object
+ * @param conf the Configuration object
* @param reduce the reduce id to get for
* @param timeout number of ms for connection and read timeout
* @return the path of the file that got created
@@ -184,7 +189,8 @@
FileSystem localFileSys,
MetricsRecord shuffleMetrics,
Path localFilename,
- int reduce,
+ LocalDirAllocator lDirAlloc,
+ Configuration conf, int reduce,
int timeout) throws IOException, InterruptedException {
boolean good = false;
long totalBytes = 0;
@@ -216,6 +222,11 @@
inMemFileSys.reserveSpaceWithCheckSum(localFilename, length));
if (createInMem) {
fileSys = inMemFileSys;
+ }
+ else {
+ //now hit the localFS to find out a suitable location for the output
+ localFilename = lDirAlloc.getLocalPathForWrite(
+ localFilename.toUri().getPath(), length + checksumLength, conf);
}
output = fileSys.create(localFilename);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon May 7 14:25:07 2007
@@ -58,6 +58,7 @@
private BytesWritable split = new BytesWritable();
private String splitClass;
private InputSplit instantiatedSplit = null;
+ private final static int APPROX_HEADER_LENGTH = 150;
private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
@@ -391,11 +392,16 @@
//sort, combine and spill to disk
private void sortAndSpillToDisk() throws IOException {
synchronized (this) {
- Path filename = mapOutputFile.getSpillFile(getTaskId(), numSpills);
+ //approximate the length of the output file to be the length of the
+ //buffer + header lengths for the partitions
+ long size = keyValBuffer.getLength() +
+ partitions * APPROX_HEADER_LENGTH;
+ Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(),
+ numSpills, size);
//we just create the FSDataOutputStream object here.
out = localFs.create(filename);
- Path indexFilename = mapOutputFile.getSpillIndexFile(getTaskId(),
- numSpills);
+ Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+ getTaskId(), numSpills, partitions * 16);
indexOut = localFs.create(indexFilename);
LOG.debug("opened "+
mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
@@ -479,14 +485,31 @@
}
private void mergeParts() throws IOException {
- Path finalOutputFile = mapOutputFile.getOutputFile(getTaskId());
- Path finalIndexFile = mapOutputFile.getOutputIndexFile(getTaskId());
+ // get the approximate size of the final output/index files
+ long finalOutFileSize = 0;
+ long finalIndexFileSize = 0;
+ Path [] filename = new Path[numSpills];
+ Path [] indexFileName = new Path[numSpills];
+
+ for(int i = 0; i < numSpills; i++) {
+ filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
+ indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
+ finalOutFileSize += localFs.getLength(filename[i]);
+ }
+ //make correction in the length to include the sequence file header
+ //lengths for each partition
+ finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
+
+ finalIndexFileSize = partitions * 16;
+
+ Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskId(),
+ finalOutFileSize);
+ Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
+ getTaskId(), finalIndexFileSize);
if (numSpills == 1) { //the spill is the final output
- Path spillPath = mapOutputFile.getSpillFile(getTaskId(), 0);
- Path spillIndexPath = mapOutputFile.getSpillIndexFile(getTaskId(), 0);
- localFs.rename(spillPath, finalOutputFile);
- localFs.rename(spillIndexPath, finalIndexFile);
+ localFs.rename(filename[0], finalOutputFile);
+ localFs.rename(indexFileName[0], finalIndexFile);
return;
}
@@ -513,14 +536,6 @@
return;
}
{
- Path [] filename = new Path[numSpills];
- Path [] indexFileName = new Path[numSpills];
-
- for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
- indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
- }
-
//create a sorter object as we need access to the SegmentDescriptor
//class and merge methods
Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Mon May 7 14:25:07 2007
@@ -17,18 +17,13 @@
*/
package org.apache.hadoop.mapred;
-import org.apache.hadoop.conf.*;
-
import java.io.*;
/** Runs a map task. */
class MapTaskRunner extends TaskRunner {
- private MapOutputFile mapOutputFile;
public MapTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
super(task, tracker, conf);
- this.mapOutputFile = new MapOutputFile();
- this.mapOutputFile.setConf(conf);
}
/** Delete any temporary files from previous failed attempts. */
@@ -37,13 +32,13 @@
return false;
}
- this.mapOutputFile.removeAll(getTask().getTaskId());
+ mapOutputFile.removeAll(getTask().getTaskId());
return true;
}
/** Delete all of the temporary map output files. */
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
- this.mapOutputFile.removeAll(getTask().getTaskId());
+ mapOutputFile.removeAll(getTask().getTaskId());
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon May 7 14:25:07 2007
@@ -61,6 +61,7 @@
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import static org.apache.hadoop.mapred.Task.Counter.*;
@@ -265,7 +266,14 @@
// the list of files to merge, otherwise not.
List<Path> mapFilesList = new ArrayList<Path>();
for(int i=0; i < numMaps; i++) {
- Path f = mapOutputFile.getInputFile(i, getTaskId());
+ Path f;
+ try {
+ //catch and ignore DiskErrorException, since some map outputs will
+ //really be absent (inmem merge).
+ f = mapOutputFile.getInputFile(i, getTaskId());
+ } catch (DiskErrorException d) {
+ continue;
+ }
if (lfs.exists(f))
mapFilesList.add(f);
}
@@ -292,7 +300,7 @@
};
sortProgress.setName("Sort progress reporter for task "+getTaskId());
- Path tempDir = job.getLocalPath(getTaskId());
+ Path tempDir = new Path(getTaskId());
WritableComparator comparator = job.getOutputValueGroupingComparator();
@@ -496,6 +504,11 @@
private Random random = null;
+ /**
+ * the max size of the merge output from ramfs
+ */
+ private long ramfsMergeOutputSize;
+
/** Represents the result of an attempt to copy a map output */
private class CopyResult {
@@ -523,6 +536,15 @@
public MapOutputLocation getLocation() { return loc; }
}
+ private int extractMapIdFromPathName(Path pathname) {
+ //all paths end with map_<id>.out
+ String firstPathName = pathname.getName();
+ int beginIndex = firstPathName.lastIndexOf("map_");
+ int endIndex = firstPathName.lastIndexOf(".out");
+ return Integer.parseInt(firstPathName.substring(beginIndex +
+ "map_".length(), endIndex));
+ }
+
private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
//spawn a thread to give copy progress heartbeats
Thread copyProgress = new Thread() {
@@ -645,14 +667,17 @@
String reduceId = reduceTask.getTaskId();
LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
" output from " + loc.getHost() + ".");
- // the place where the file should end up
- Path finalFilename = conf.getLocalPath(reduceId + "/map_" +
- loc.getMapId() + ".out");
+ // a temp filename. If this file gets created in ramfs, we're fine,
+ // else, we will check the localFS to find a suitable final location
+ // for this path
+ Path filename = new Path("/" + reduceId + "/map_" +
+ loc.getMapId() + ".out");
// a working filename that will be unique to this attempt
- Path tmpFilename = new Path(finalFilename + "-" + id);
+ Path tmpFilename = new Path(filename + "-" + id);
// this copies the map output file
tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
- tmpFilename, reduceTask.getPartition(),
+ tmpFilename, lDirAlloc,
+ conf, reduceTask.getPartition(),
STALLED_COPY_TIMEOUT);
if (!neededOutputs.contains(loc.getMapId())) {
if (tmpFilename != null) {
@@ -662,7 +687,7 @@
return CopyResult.OBSOLETE;
}
if (tmpFilename == null)
- throw new IOException("File " + finalFilename + "-" + id +
+ throw new IOException("File " + filename + "-" + id +
" not created");
long bytes = -1;
// lock the ReduceTask while we do the rename
@@ -676,9 +701,12 @@
}
bytes = fs.getLength(tmpFilename);
+ //resolve the final filename against the directory where the tmpFile
+ //got created
+ filename = new Path(tmpFilename.getParent(), filename.getName());
// if we can't rename the file, something is broken (and IOException
// will be thrown).
- if (!fs.rename(tmpFilename, finalFilename)) {
+ if (!fs.rename(tmpFilename, filename)) {
fs.delete(tmpFilename);
bytes = -1;
throw new IOException("failure to rename map output " +
@@ -766,6 +794,8 @@
inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
+ uri);
+ ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE *
+ inMemFileSys.getFSSize());
localFileSys = FileSystem.getLocal(conf);
//create an instance of the sorter
sorter =
@@ -1025,9 +1055,12 @@
//it is not guaranteed that this file will be present after merge
//is called (we delete empty sequence files as soon as we see them
//in the merge method)
+ int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
+ Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
+ reduceTask.getTaskId(), ramfsMergeOutputSize);
SequenceFile.Writer writer = sorter.cloneFileAttributes(
inMemFileSys.makeQualified(inMemClosedFiles[0]),
- localFileSys.makeQualified(inMemClosedFiles[0]), null);
+ localFileSys.makeQualified(outputPath), null);
SequenceFile.Sorter.RawKeyValueIterator rIter = null;
try {
@@ -1046,7 +1079,7 @@
LOG.info(reduceTask.getTaskId() +
" Merge of the " +inMemClosedFiles.length +
" files in InMemoryFileSystem complete." +
- " Local file is " + inMemClosedFiles[0]);
+ " Local file is " + outputPath);
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskId() +
" Final merge of the inmemory files threw an exception: " +
@@ -1151,9 +1184,16 @@
//it is not guaranteed that this file will be present after merge
//is called (we delete empty sequence files as soon as we see them
//in the merge method)
+
+ //figure out the mapId
+ int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
+
+ Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
+ reduceTask.getTaskId(), ramfsMergeOutputSize);
+
SequenceFile.Writer writer = sorter.cloneFileAttributes(
inMemFileSys.makeQualified(inMemClosedFiles[0]),
- localFileSys.makeQualified(inMemClosedFiles[0]), null);
+ localFileSys.makeQualified(outputPath), null);
SequenceFile.Sorter.RawKeyValueIterator rIter;
try {
rIter = sorter.merge(inMemClosedFiles, true,
@@ -1162,7 +1202,7 @@
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
writer.close();
- localFileSys.delete(inMemClosedFiles[0]);
+ localFileSys.delete(outputPath);
throw new IOException (StringUtils.stringifyException(e));
}
sorter.writeFile(rIter, writer);
@@ -1170,7 +1210,7 @@
LOG.info(reduceTask.getTaskId() +
" Merge of the " +inMemClosedFiles.length +
" files in InMemoryFileSystem complete." +
- " Local file is " + inMemClosedFiles[0]);
+ " Local file is " + outputPath);
}
else {
LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon May 7 14:25:07 2007
@@ -17,23 +17,15 @@
*/
package org.apache.hadoop.mapred;
-import org.apache.hadoop.conf.*;
-
import java.io.*;
/** Runs a reduce task. */
class ReduceTaskRunner extends TaskRunner {
- /**
- * for cleaning up old map outputs
- */
- private MapOutputFile mapOutputFile;
public ReduceTaskRunner(Task task, TaskTracker tracker,
JobConf conf) throws IOException {
super(task, tracker, conf);
- this.mapOutputFile = new MapOutputFile();
- this.mapOutputFile.setConf(conf);
}
/** Assemble all of the map output files */
@@ -43,7 +35,7 @@
}
// cleanup from failures
- this.mapOutputFile.removeAll(getTask().getTaskId());
+ mapOutputFile.removeAll(getTask().getTaskId());
return true;
}
@@ -52,6 +44,6 @@
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
getTask().getProgress().setStatus("closed");
- this.mapOutputFile.removeAll(getTask().getTaskId());
+ mapOutputFile.removeAll(getTask().getTaskId());
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon May 7 14:25:07 2007
@@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
@@ -69,6 +70,7 @@
protected JobConf conf;
protected MapOutputFile mapOutputFile = new MapOutputFile();
+ protected LocalDirAllocator lDirAlloc;
////////////////////////////////////////////
// Constructors
@@ -293,6 +295,7 @@
this.conf = new JobConf(conf);
}
this.mapOutputFile.setConf(this.conf);
+ this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
}
public Configuration getConf() {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon May 7 14:25:07 2007
@@ -43,7 +43,12 @@
private TaskLog.Writer taskStdOutLogWriter;
private TaskLog.Writer taskStdErrLogWriter;
-
+
+ /**
+ * for cleaning up old map outputs
+ */
+ protected MapOutputFile mapOutputFile;
+
public TaskRunner(Task t, TaskTracker tracker, JobConf conf) {
this.t = t;
this.tracker = tracker;
@@ -60,6 +65,8 @@
this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024,
this.conf.getBoolean("mapred.userlog.purgesplits", true),
this.conf.getInt("mapred.userlog.retain.hours", 12));
+ this.mapOutputFile = new MapOutputFile();
+ this.mapOutputFile.setConf(conf);
}
public Task getTask() { return t; }
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=535997&r1=535996&r2=535997
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon May 7 14:25:07 2007
@@ -53,6 +53,7 @@
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -88,6 +89,7 @@
private boolean running = true;
+ private LocalDirAllocator localDirAllocator;
String taskTrackerName;
String localHostname;
InetSocketAddress jobTrackAddr;
@@ -657,10 +659,12 @@
// let the jsp pages get to the task tracker, config, and other relevant
// objects
FileSystem local = FileSystem.getLocal(conf);
+ this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
server.setAttribute("task.tracker", this);
server.setAttribute("local.file.system", local);
server.setAttribute("conf", conf);
server.setAttribute("log", LOG);
+ server.setAttribute("localDirAllocator", localDirAllocator);
server.addServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
server.start();
this.httpPort = server.getPort();
@@ -1872,15 +1876,19 @@
byte[] buffer = new byte[MAX_BYTES_TO_READ];
OutputStream outStream = response.getOutputStream();
JobConf conf = (JobConf) context.getAttribute("conf");
+ LocalDirAllocator lDirAlloc =
+ (LocalDirAllocator)context.getAttribute("localDirAllocator");
FileSystem fileSys =
(FileSystem) context.getAttribute("local.file.system");
// Index file
- Path indexFileName = conf.getLocalPath(mapId+"/file.out.index");
+ Path indexFileName = lDirAlloc.getLocalPathToRead(
+ mapId+"/file.out.index", conf);
FSDataInputStream indexIn = null;
// Map-output file
- Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out");
+ Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
+ mapId+"/file.out", conf);
FSDataInputStream mapOutputIn = null;
// true iff IOException was caused by attempt to access input